From 1b9f5470ec750a24d27825e2f555020d2c6d9a7d Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 11 Oct 2023 17:44:48 +0200 Subject: [PATCH] GH-37910: [Java][Integration] Implement C Data Interface integration testing --- ci/scripts/integration_arrow.sh | 4 +- dev/archery/archery/integration/cdata.py | 18 +- dev/archery/archery/integration/datagen.py | 1 - dev/archery/archery/integration/runner.py | 23 +-- dev/archery/archery/integration/tester.py | 104 +++++----- dev/archery/archery/integration/tester_cpp.py | 11 -- .../archery/integration/tester_csharp.py | 19 +- dev/archery/archery/integration/tester_go.py | 18 +- .../archery/integration/tester_java.py | 177 +++++++++++++++++- docker-compose.yml | 11 +- .../arrow/c/BufferImportTypeVisitor.java | 4 +- .../main/java/org/apache/arrow/c/Format.java | 4 + .../org/apache/arrow/c/SchemaImporter.java | 2 +- .../org/apache/arrow/c/DictionaryTest.java | 4 +- .../java/org/apache/arrow/c/StreamTest.java | 2 +- .../org/apache/arrow/vector/NullVector.java | 1 + .../vector/compare/RangeEqualsVisitor.java | 6 +- .../vector/dictionary/DictionaryProvider.java | 29 ++- .../arrow/vector/ipc/JsonFileReader.java | 38 +++- .../vector/ipc/message/ArrowRecordBatch.java | 4 +- .../apache/arrow/vector/util/Validator.java | 26 +++ 21 files changed, 372 insertions(+), 134 deletions(-) diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh index 289d376a4db9b..2861b1c09d479 100755 --- a/ci/scripts/integration_arrow.sh +++ b/ci/scripts/integration_arrow.sh @@ -23,8 +23,8 @@ arrow_dir=${1} gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration pip install -e $arrow_dir/dev/archery[integration] -# For C# C Data Interface testing -pip install pythonnet +# For C Data Interface testing +pip install jpype1 pythonnet # Get more detailed context on crashes export PYTHONFAULTHANDLER=1 diff --git a/dev/archery/archery/integration/cdata.py b/dev/archery/archery/integration/cdata.py index c201f5f867f8f..8e5550fcdb9c5 100644 --- a/dev/archery/archery/integration/cdata.py +++ b/dev/archery/archery/integration/cdata.py @@ -80,6 +80,15 @@ def ffi() -> cffi.FFI: return ffi +def _release_memory_steps(exporter: CDataExporter, importer: CDataImporter): + yield + for i in range(max(exporter.required_gc_runs, importer.required_gc_runs)): + importer.run_gc() + yield + exporter.run_gc() + yield + + @contextmanager def check_memory_released(exporter: CDataExporter, importer: CDataImporter): """ @@ -96,12 +105,13 @@ def check_memory_released(exporter: CDataExporter, importer: CDataImporter): if do_check: before = exporter.record_allocation_state() yield - # We don't use a `finally` clause: if the enclosed block raised an - # exception, no need to add another one. + # Only check for memory state if `yield` didn't raise. if do_check: - ok = exporter.compare_allocation_state(before, importer.gc_until) - if not ok: + for _ in _release_memory_steps(exporter, importer): after = exporter.record_allocation_state() + if after == before: + break + if after != before: raise RuntimeError( f"Memory was not released correctly after roundtrip: " f"before = {before}, after = {after} (should have been equal)") diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index f229012366e1f..7635cfd98feda 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1722,7 +1722,6 @@ def generate_dictionary_unsigned_case(): # TODO: JavaScript does not support uint64 dictionary indices, so disabled # for now - # dict3 = Dictionary(3, StringField('dictionary3'), size=5, name='DICT3') fields = [ DictionaryField('f0', get_field('', 'uint8'), dict0), diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index eb2e26951cd88..841633f94cdba 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -421,17 +421,18 @@ def _compare_c_data_implementations( # Serial execution is required for proper memory accounting serial = True - exporter = producer.make_c_data_exporter() - importer = consumer.make_c_data_importer() - - case_runner = partial(self._run_c_schema_test_case, producer, consumer, - exporter, importer) - self._run_test_cases(case_runner, self.json_files, serial=serial) - - if producer.C_DATA_ARRAY_EXPORTER and consumer.C_DATA_ARRAY_IMPORTER: - case_runner = partial(self._run_c_array_test_cases, producer, consumer, - exporter, importer) - self._run_test_cases(case_runner, self.json_files, serial=serial) + with producer.make_c_data_exporter() as exporter: + with consumer.make_c_data_importer() as importer: + case_runner = partial(self._run_c_schema_test_case, + producer, consumer, + exporter, importer) + self._run_test_cases(case_runner, self.json_files, serial=serial) + + if producer.C_DATA_ARRAY_EXPORTER and consumer.C_DATA_ARRAY_IMPORTER: + case_runner = partial(self._run_c_array_test_cases, + producer, consumer, + exporter, importer) + self._run_test_cases(case_runner, self.json_files, serial=serial) def _run_c_schema_test_case(self, producer: Tester, consumer: Tester, diff --git a/dev/archery/archery/integration/tester.py b/dev/archery/archery/integration/tester.py index 6cde20e61b321..eadb953a61b50 100644 --- a/dev/archery/archery/integration/tester.py +++ b/dev/archery/archery/integration/tester.py @@ -68,52 +68,52 @@ def supports_releasing_memory(self) -> bool: Whether the implementation is able to release memory deterministically. Here, "release memory" means that, after the `release` callback of - a C Data Interface export is called, `compare_allocation_state` is - able to trigger the deallocation of the memory underlying the export - (for example buffer data). + a C Data Interface export is called, `run_gc` is able to trigger + the deallocation of the memory underlying the export (such as buffer data). - If false, then `record_allocation_state` and `compare_allocation_state` - are allowed to raise NotImplementedError. + If false, then `record_allocation_state` is allowed to raise + NotImplementedError. """ def record_allocation_state(self) -> object: """ - Record the current memory allocation state. + Return the current memory allocation state. Returns ------- state : object - Opaque object representing the allocation state, - for example the number of allocated bytes. + Equality-comparable object representing the allocation state, + for example the number of allocated or exported bytes. """ raise NotImplementedError - def compare_allocation_state(self, recorded: object, - gc_until: typing.Callable[[_Predicate], bool] - ) -> bool: + def run_gc(self): """ - Compare the current memory allocation state with the recorded one. + Run the GC if necessary. - Parameters - ---------- - recorded : object - The previous allocation state returned by - `record_allocation_state()` - gc_until : callable - A callable itself accepting a callable predicate, and - returning a boolean. - `gc_until` should try to release memory until the predicate - becomes true, or until it decides to give up. The final value - of the predicate should be returned. - `gc_until` is typically provided by the C Data Interface importer. + This should ensure that any temporary objects and data created by + previous exporter calls are collected. + """ - Returns - ------- - success : bool - Whether memory allocation state finally reached its previously - recorded value. + @property + def required_gc_runs(self): """ - raise NotImplementedError + The maximum number of calls to `run_gc` that need to be issued to + ensure proper deallocation. Some implementations may require this + to be greater than one. + """ + return 1 + + def close(self): + """ + Final cleanup after usage. + """ + + def __enter__(self): + return self + + def __exit__(self, *exc): + self.close() class CDataImporter(ABC): @@ -163,32 +163,40 @@ def supports_releasing_memory(self) -> bool: """ Whether the implementation is able to release memory deterministically. - Here, "release memory" means calling the `release` callback of - a C Data Interface export (which should then trigger a deallocation - mechanism on the exporter). + Here, "release memory" means `run_gc()` is able to trigger the + `release` callback of a C Data Interface export (which would then + induce a deallocation mechanism on the exporter). + """ - If false, then `gc_until` is allowed to raise NotImplementedError. + def run_gc(self): """ + Run the GC if necessary. - def gc_until(self, predicate: _Predicate): + This should ensure that any imported data has its release callback called. """ - Try to release memory until the predicate becomes true, or fail. - Depending on the CDataImporter implementation, this may for example - try once, or run a garbage collector a given number of times, or - any other implementation-specific strategy for releasing memory. + @property + def required_gc_runs(self): + """ + The maximum number of calls to `run_gc` that need to be issued to + ensure release callbacks are triggered. Some implementations may + require this to be greater than one. + """ + return 1 - The running time should be kept reasonable and compatible with - execution of multiple C Data integration tests. + def close(self): + """ + Final cleanup after usage. + """ - This should not raise if `supports_releasing_memory` is true. + def __enter__(self): + return self - Returns - ------- - success : bool - The final value of the predicate. - """ - raise NotImplementedError + def __exit__(self, *exc): + # Make sure any exported data is released. + for i in range(self.required_gc_runs): + self.run_gc() + self.close() class Tester: diff --git a/dev/archery/archery/integration/tester_cpp.py b/dev/archery/archery/integration/tester_cpp.py index 866fc225d220a..658e71330155e 100644 --- a/dev/archery/archery/integration/tester_cpp.py +++ b/dev/archery/archery/integration/tester_cpp.py @@ -223,13 +223,6 @@ def supports_releasing_memory(self): def record_allocation_state(self): return self.dll.ArrowCpp_BytesAllocated() - def compare_allocation_state(self, recorded, gc_until): - def pred(): - # No GC on our side, so just compare allocation state - return self.record_allocation_state() == recorded - - return gc_until(pred) - class CppCDataImporter(CDataImporter, _CDataBase): @@ -247,7 +240,3 @@ def import_batch_and_compare_to_json(self, json_path, num_batch, @property def supports_releasing_memory(self): return True - - def gc_until(self, predicate): - # No GC on our side, so can evaluate predicate immediately - return predicate() diff --git a/dev/archery/archery/integration/tester_csharp.py b/dev/archery/archery/integration/tester_csharp.py index 83b07495f9907..7dca525673ba6 100644 --- a/dev/archery/archery/integration/tester_csharp.py +++ b/dev/archery/archery/integration/tester_csharp.py @@ -16,7 +16,6 @@ # under the License. from contextlib import contextmanager -import gc import os from . import cdata @@ -82,6 +81,10 @@ def _read_batch_from_json(self, json_path, num_batch): schema = jf.Schema.ToArrow() return schema, jf.Batches[num_batch].ToArrow(schema) + def _run_gc(self): + from Apache.Arrow.IntegrationTest import CDataInterface + CDataInterface.RunGC() + class CSharpCDataExporter(CDataExporter, _CDataBase): @@ -105,6 +108,9 @@ def supports_releasing_memory(self): # XXX the C# GC doesn't give reliable allocation measurements return False + def run_gc(self): + self._run_gc() + class CSharpCDataImporter(CDataImporter, _CDataBase): @@ -134,15 +140,8 @@ def import_batch_and_compare_to_json(self, json_path, num_batch, def supports_releasing_memory(self): return True - def gc_until(self, predicate): - from Apache.Arrow.IntegrationTest import CDataInterface - for i in range(3): - if predicate(): - return True - # Collect any C# objects hanging around through Python - gc.collect() - CDataInterface.RunGC() - return predicate() + def run_gc(self): + self._run_gc() class CSharpTester(Tester): diff --git a/dev/archery/archery/integration/tester_go.py b/dev/archery/archery/integration/tester_go.py index b7af233f5d6e2..2b3dc3a1be336 100644 --- a/dev/archery/archery/integration/tester_go.py +++ b/dev/archery/archery/integration/tester_go.py @@ -200,9 +200,6 @@ def _check_go_error(self, go_error): finally: self.dll.ArrowGo_FreeError(go_error) - def _run_gc(self): - self.dll.ArrowGo_RunGC() - class GoCDataExporter(CDataExporter, _CDataBase): # Note: the Arrow Go C Data export functions expect their output @@ -225,14 +222,10 @@ def supports_releasing_memory(self): return True def record_allocation_state(self): - self._run_gc() return self.dll.ArrowGo_BytesAllocated() - def compare_allocation_state(self, recorded, gc_until): - def pred(): - return self.record_allocation_state() == recorded - - return gc_until(pred) + # Note: no need to call the Go GC anywhere thanks to Arrow Go's + # explicit refcounting. class GoCDataImporter(CDataImporter, _CDataBase): @@ -252,10 +245,3 @@ def import_batch_and_compare_to_json(self, json_path, num_batch, @property def supports_releasing_memory(self): return True - - def gc_until(self, predicate): - for i in range(10): - if predicate(): - return True - self._run_gc() - return False diff --git a/dev/archery/archery/integration/tester_java.py b/dev/archery/archery/integration/tester_java.py index 45855079eb72e..5684798d794ad 100644 --- a/dev/archery/archery/integration/tester_java.py +++ b/dev/archery/archery/integration/tester_java.py @@ -16,10 +16,12 @@ # under the License. import contextlib +import functools import os import subprocess -from .tester import Tester +from . import cdata +from .tester import Tester, CDataExporter, CDataImporter from .util import run_cmd, log from ..utils.source import ARROW_ROOT_DEFAULT @@ -32,6 +34,8 @@ def load_version_from_pom(): return version_tag.text +# XXX Should we add "-Darrow.memory.debug.allocator=true"? It adds a couple +# minutes to total CPU usage of the integration test suite. _JAVA_OPTS = [ "-Dio.netty.tryReflectionSetAccessible=true", "-Darrow.struct.conflict.policy=CONFLICT_APPEND", @@ -42,18 +46,25 @@ def load_version_from_pom(): "ARROW_JAVA_INTEGRATION_JAR", os.path.join( ARROW_ROOT_DEFAULT, - "java/tools/target/arrow-tools-{}-" - "jar-with-dependencies.jar".format(_arrow_version), - ), + "java/tools/target", + f"arrow-tools-{_arrow_version}-jar-with-dependencies.jar" + ) +) +_ARROW_C_DATA_JAR = os.environ.get( + "ARROW_C_DATA_JAVA_INTEGRATION_JAR", + os.path.join( + ARROW_ROOT_DEFAULT, + "java/c/target", + f"arrow-c-data-{_arrow_version}.jar" + ) ) _ARROW_FLIGHT_JAR = os.environ.get( "ARROW_FLIGHT_JAVA_INTEGRATION_JAR", os.path.join( ARROW_ROOT_DEFAULT, - "java/flight/flight-integration-tests/target/" - "flight-integration-tests-{}-jar-with-dependencies.jar".format( - _arrow_version), - ), + "java/flight/flight-integration-tests/target", + f"flight-integration-tests-{_arrow_version}-jar-with-dependencies.jar" + ) ) _ARROW_FLIGHT_SERVER = ( "org.apache.arrow.flight.integration.tests.IntegrationTestServer" @@ -63,11 +74,155 @@ def load_version_from_pom(): ) +@functools.lru_cache +def setup_jpype(): + import jpype + jar_path = f"{_ARROW_TOOLS_JAR}:{_ARROW_C_DATA_JAR}" + # XXX Didn't manage to tone down the logging level here (DEBUG -> INFO) + jpype.startJVM(jpype.getDefaultJVMPath(), + "-Djava.class.path=" + jar_path, *_JAVA_OPTS) + + +class _CDataBase: + + def __init__(self, debug, args): + import jpype + self.debug = debug + self.args = args + self.ffi = cdata.ffi() + setup_jpype() + # JPype pointers to java.io, org.apache.arrow... + self.java_io = jpype.JPackage("java").io + self.java_arrow = jpype.JPackage("org").apache.arrow + self.java_allocator = self._make_java_allocator() + + def _pointer_to_int(self, c_ptr): + return int(self.ffi.cast('uintptr_t', c_ptr)) + + def _wrap_c_schema_ptr(self, c_schema_ptr): + return self.java_arrow.c.ArrowSchema.wrap( + self._pointer_to_int(c_schema_ptr)) + + def _wrap_c_array_ptr(self, c_array_ptr): + return self.java_arrow.c.ArrowArray.wrap( + self._pointer_to_int(c_array_ptr)) + + def _make_java_allocator(self): + # Return a new allocator + return self.java_arrow.memory.RootAllocator() + + def _assert_schemas_equal(self, expected, actual): + # XXX This is fragile for dictionaries, as Schema.equals compares + # dictionary ids. + self.java_arrow.vector.util.Validator.compareSchemas( + expected, actual) + + def _assert_batches_equal(self, expected, actual): + self.java_arrow.vector.util.Validator.compareVectorSchemaRoot( + expected, actual) + + def _assert_dict_providers_equal(self, expected, actual): + self.java_arrow.vector.util.Validator.compareDictionaryProviders( + expected, actual) + + # Note: no need to call the Java GC anywhere thanks to AutoCloseable + + +class JavaCDataExporter(CDataExporter, _CDataBase): + + def export_schema_from_json(self, json_path, c_schema_ptr): + json_file = self.java_io.File(json_path) + with self.java_arrow.vector.ipc.JsonFileReader( + json_file, self.java_allocator) as json_reader: + schema = json_reader.start() + dict_provider = json_reader + self.java_arrow.c.Data.exportSchema( + self.java_allocator, schema, dict_provider, + self._wrap_c_schema_ptr(c_schema_ptr) + ) + + def export_batch_from_json(self, json_path, num_batch, c_array_ptr): + json_file = self.java_io.File(json_path) + with self.java_arrow.vector.ipc.JsonFileReader( + json_file, self.java_allocator) as json_reader: + json_reader.start() + if num_batch > 0: + actually_skipped = json_reader.skip(num_batch) + assert actually_skipped == num_batch + with json_reader.read() as batch: + dict_provider = json_reader + self.java_arrow.c.Data.exportVectorSchemaRoot( + self.java_allocator, batch, dict_provider, + self._wrap_c_array_ptr(c_array_ptr)) + + @property + def supports_releasing_memory(self): + return True + + def record_allocation_state(self): + return self.java_allocator.getAllocatedMemory() + + def close(self): + self.java_allocator.close() + + +class JavaCDataImporter(CDataImporter, _CDataBase): + + def import_schema_and_compare_to_json(self, json_path, c_schema_ptr): + json_file = self.java_io.File(json_path) + with self.java_arrow.vector.ipc.JsonFileReader( + json_file, self.java_allocator) as json_reader: + json_schema = json_reader.start() + with self.java_arrow.c.CDataDictionaryProvider() as dict_provider: + imported_schema = self.java_arrow.c.Data.importSchema( + self.java_allocator, + self._wrap_c_schema_ptr(c_schema_ptr), + dict_provider) + self._assert_schemas_equal(json_schema, imported_schema) + + def import_batch_and_compare_to_json(self, json_path, num_batch, + c_array_ptr): + json_file = self.java_io.File(json_path) + with self.java_arrow.vector.ipc.JsonFileReader( + json_file, self.java_allocator) as json_reader: + schema = json_reader.start() + if num_batch > 0: + actually_skipped = json_reader.skip(num_batch) + assert actually_skipped == num_batch + with json_reader.read() as batch: + with self.java_arrow.vector.VectorSchemaRoot.create( + schema, self.java_allocator) as imported_batch: + # We need to pass a dict provider primed with dictionary ids + # matching those in the schema, hence an empty + # CDataDictionaryProvider would not work here. + dict_provider = (self.java_arrow.vector.dictionary + .DictionaryProvider.MapDictionaryProvider()) + dict_provider.copyStructureFrom(json_reader, self.java_allocator) + with dict_provider: + self.java_arrow.c.Data.importIntoVectorSchemaRoot( + self.java_allocator, + self._wrap_c_array_ptr(c_array_ptr), + imported_batch, dict_provider) + self._assert_batches_equal(batch, imported_batch) + self._assert_dict_providers_equal(json_reader, dict_provider) + + @property + def supports_releasing_memory(self): + return True + + def close(self): + self.java_allocator.close() + + class JavaTester(Tester): PRODUCER = True CONSUMER = True FLIGHT_SERVER = True FLIGHT_CLIENT = True + C_DATA_SCHEMA_EXPORTER = True + C_DATA_SCHEMA_IMPORTER = True + C_DATA_ARRAY_EXPORTER = True + C_DATA_ARRAY_IMPORTER = True name = 'Java' @@ -186,3 +341,9 @@ def flight_server(self, scenario_name=None): finally: server.kill() server.wait(5) + + def make_c_data_exporter(self): + return JavaCDataExporter(self.debug, self.args) + + def make_c_data_importer(self): + return JavaCDataImporter(self.debug, self.args) diff --git a/docker-compose.yml b/docker-compose.yml index 0e5034346e780..e54c609e54138 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1730,16 +1730,21 @@ services: volumes: *conda-volumes environment: <<: [*common, *ccache] - # tell archery where the arrow binaries are located + ARCHERY_INTEGRATION_WITH_RUST: 0 + # Tell Archery where the arrow C++ binaries are located ARROW_CPP_EXE_PATH: /build/cpp/debug ARROW_GO_INTEGRATION: 1 - ARCHERY_INTEGRATION_WITH_RUST: 0 + ARROW_JAVA_CDATA: "ON" + JAVA_JNI_CMAKE_ARGS: >- + -DARROW_JAVA_JNI_ENABLE_DEFAULT=OFF + -DARROW_JAVA_JNI_ENABLE_C=ON command: ["/arrow/ci/scripts/rust_build.sh /arrow /build && /arrow/ci/scripts/cpp_build.sh /arrow /build && /arrow/ci/scripts/csharp_build.sh /arrow /build && /arrow/ci/scripts/go_build.sh /arrow && - /arrow/ci/scripts/java_build.sh /arrow /build && + /arrow/ci/scripts/java_jni_build.sh /arrow $${ARROW_HOME} /build /tmp/dist/java/$$(arch) && + /arrow/ci/scripts/java_build.sh /arrow /build /tmp/dist/java && /arrow/ci/scripts/js_build.sh /arrow /build && /arrow/ci/scripts/integration_arrow.sh /arrow /build"] diff --git a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java index 7408bf71136fa..cd2a464f4fa17 100644 --- a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java +++ b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java @@ -165,9 +165,9 @@ public List visit(ArrowType.Union type) { return Collections.singletonList(importFixedBytes(type, 0, UnionVector.TYPE_WIDTH)); case Dense: return Arrays.asList(importFixedBytes(type, 0, DenseUnionVector.TYPE_WIDTH), - importFixedBytes(type, 0, DenseUnionVector.OFFSET_WIDTH)); + importFixedBytes(type, 1, DenseUnionVector.OFFSET_WIDTH)); default: - throw new UnsupportedOperationException("Importing buffers for type: " + type); + throw new UnsupportedOperationException("Importing buffers for union type: " + type); } } diff --git a/java/c/src/main/java/org/apache/arrow/c/Format.java b/java/c/src/main/java/org/apache/arrow/c/Format.java index 315d3caad7da2..2875e46f749c4 100644 --- a/java/c/src/main/java/org/apache/arrow/c/Format.java +++ b/java/c/src/main/java/org/apache/arrow/c/Format.java @@ -138,6 +138,8 @@ static String asString(ArrowType arrowType) { return "tiD"; case YEAR_MONTH: return "tiM"; + case MONTH_DAY_NANO: + return "tin"; default: throw new UnsupportedOperationException( String.format("Interval type with unit %s is unsupported", type.getUnit())); @@ -277,6 +279,8 @@ static ArrowType asType(String format, long flags) return new ArrowType.Interval(IntervalUnit.YEAR_MONTH); case "tiD": return new ArrowType.Interval(IntervalUnit.DAY_TIME); + case "tin": + return new ArrowType.Interval(IntervalUnit.MONTH_DAY_NANO); case "+l": return new ArrowType.List(); case "+L": diff --git a/java/c/src/main/java/org/apache/arrow/c/SchemaImporter.java b/java/c/src/main/java/org/apache/arrow/c/SchemaImporter.java index 21d88f6cd4ba5..09a6afafa0a46 100644 --- a/java/c/src/main/java/org/apache/arrow/c/SchemaImporter.java +++ b/java/c/src/main/java/org/apache/arrow/c/SchemaImporter.java @@ -44,7 +44,7 @@ final class SchemaImporter { private static final Logger logger = LoggerFactory.getLogger(SchemaImporter.class); private static final int MAX_IMPORT_RECURSION_LEVEL = 64; - private long nextDictionaryID = 1L; + private long nextDictionaryID = 0L; private final BufferAllocator allocator; diff --git a/java/c/src/test/java/org/apache/arrow/c/DictionaryTest.java b/java/c/src/test/java/org/apache/arrow/c/DictionaryTest.java index 3f793f836d634..9dcb262af4616 100644 --- a/java/c/src/test/java/org/apache/arrow/c/DictionaryTest.java +++ b/java/c/src/test/java/org/apache/arrow/c/DictionaryTest.java @@ -100,7 +100,7 @@ public void testWithDictionary() throws Exception { dictVector.setSafe(2, "cc".getBytes()); dictVector.setValueCount(3); - Dictionary dictionary = new Dictionary(dictVector, new DictionaryEncoding(1L, false, /* indexType= */null)); + Dictionary dictionary = new Dictionary(dictVector, new DictionaryEncoding(0L, false, /* indexType= */null)); provider.put(dictionary); // create vector and encode it @@ -169,7 +169,7 @@ private ArrowStreamReader createMultiBatchReader() throws IOException { dictVector.setSafe(3, "dd".getBytes()); dictVector.setSafe(4, "ee".getBytes()); dictVector.setValueCount(5); - Dictionary dictionary = new Dictionary(dictVector, new DictionaryEncoding(1L, false, /* indexType= */null)); + Dictionary dictionary = new Dictionary(dictVector, new DictionaryEncoding(0L, false, /* indexType= */null)); provider.put(dictionary); Schema schema = new Schema(Collections.singletonList(vector.getField())); diff --git a/java/c/src/test/java/org/apache/arrow/c/StreamTest.java b/java/c/src/test/java/org/apache/arrow/c/StreamTest.java index 06401687a5a66..68d4fc2a81e68 100644 --- a/java/c/src/test/java/org/apache/arrow/c/StreamTest.java +++ b/java/c/src/test/java/org/apache/arrow/c/StreamTest.java @@ -135,7 +135,7 @@ public void roundtripStrings() throws Exception { @Test public void roundtripDictionary() throws Exception { final ArrowType.Int indexType = new ArrowType.Int(32, true); - final DictionaryEncoding encoding = new DictionaryEncoding(1L, false, indexType); + final DictionaryEncoding encoding = new DictionaryEncoding(0L, false, indexType); final Schema schema = new Schema(Collections.singletonList( new Field("dict", new FieldType(/*nullable=*/true, indexType, encoding), Collections.emptyList()))); final List batches = new ArrayList<>(); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/NullVector.java b/java/vector/src/main/java/org/apache/arrow/vector/NullVector.java index 6e4c2764bdcc4..1badf4b4ca808 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/NullVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/NullVector.java @@ -192,6 +192,7 @@ public List getChildrenFromFields() { @Override public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers) { Preconditions.checkArgument(ownBuffers.isEmpty(), "Null vector has no buffers"); + valueCount = fieldNode.getLength(); } @Override diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java index 698ddac466041..5323ddda838c8 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java @@ -121,9 +121,11 @@ public boolean rangeEquals(Range range) { "rightStart %s must be non negative.", range.getRightStart()); Preconditions.checkArgument(range.getRightStart() + range.getLength() <= right.getValueCount(), - "(rightStart + length) %s out of range[0, %s].", 0, right.getValueCount()); + "(rightStart + length) %s out of range[0, %s].", + range.getRightStart() + range.getLength(), right.getValueCount()); Preconditions.checkArgument(range.getLeftStart() + range.getLength() <= left.getValueCount(), - "(leftStart + length) %s out of range[0, %s].", 0, left.getValueCount()); + "(leftStart + length) %s out of range[0, %s].", + range.getLeftStart() + range.getLength(), left.getValueCount()); return left.accept(this, range); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java b/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java index 76e1eb9f66d25..f64c32be0f3e9 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java @@ -21,6 +21,9 @@ import java.util.Map; import java.util.Set; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.VisibleForTesting; + /** * A manager for association of dictionary IDs to their corresponding {@link Dictionary}. */ @@ -35,7 +38,7 @@ public interface DictionaryProvider { /** * Implementation of {@link DictionaryProvider} that is backed by a hash-map. */ - class MapDictionaryProvider implements DictionaryProvider { + class MapDictionaryProvider implements AutoCloseable, DictionaryProvider { private final Map map; @@ -49,6 +52,23 @@ public MapDictionaryProvider(Dictionary... dictionaries) { } } + /** + * Initialize the map structure from another provider, but with empty vectors. + * + * @param other the {@link DictionaryProvider} to copy the ids and fields from + * @param allocator allocator to create the empty vectors + */ + // This is currently called using JPype by the integration tests. + @VisibleForTesting + public void copyStructureFrom(DictionaryProvider other, BufferAllocator allocator) { + for (Long id : other.getDictionaryIds()) { + Dictionary otherDict = other.lookup(id); + Dictionary newDict = new Dictionary(otherDict.getVector().getField().createVector(allocator), + otherDict.getEncoding()); + put(newDict); + } + } + public void put(Dictionary dictionary) { map.put(dictionary.getEncoding().getId(), dictionary); } @@ -62,5 +82,12 @@ public final Set getDictionaryIds() { public Dictionary lookup(long id) { return map.get(id); } + + @Override + public void close() { + for (Dictionary dictionary : map.values()) { + dictionary.getVector().close(); + } + } } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java index 742daeef255f8..0c23a664f62d6 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java @@ -237,6 +237,28 @@ public VectorSchemaRoot read() throws IOException { } } + /** + * Skips a number of record batches in the file. + * + * @param numBatches the number of batches to skip + * @return the actual number of skipped batches. + */ + // This is currently called using JPype by the integration tests. + public int skip(int numBatches) throws IOException { + for (int i = 0; i < numBatches; ++i) { + JsonToken t = parser.nextToken(); + if (t == START_OBJECT) { + parser.skipChildren(); + assert parser.getCurrentToken() == END_OBJECT; + } else if (t == END_ARRAY) { + return i; + } else { + throw new IllegalArgumentException("Invalid token: " + t); + } + } + return numBatches; + } + private abstract class BufferReader { protected abstract ArrowBuf read(BufferAllocator allocator, int count) throws IOException; @@ -692,7 +714,8 @@ private ArrowBuf readIntoBuffer(BufferAllocator allocator, BufferType bufferType } private void readFromJsonIntoVector(Field field, FieldVector vector) throws JsonParseException, IOException { - TypeLayout typeLayout = TypeLayout.getTypeLayout(field.getType()); + ArrowType type = field.getType(); + TypeLayout typeLayout = TypeLayout.getTypeLayout(type); List vectorTypes = typeLayout.getBufferTypes(); ArrowBuf[] vectorBuffers = new ArrowBuf[vectorTypes.size()]; /* @@ -728,21 +751,18 @@ private void readFromJsonIntoVector(Field field, FieldVector vector) throws Json BufferType bufferType = vectorTypes.get(v); nextFieldIs(bufferType.getName()); int innerBufferValueCount = valueCount; - if (bufferType.equals(OFFSET) && !field.getType().getTypeID().equals(ArrowType.ArrowTypeID.Union)) { - /* offset buffer has 1 additional value capacity */ + if (bufferType.equals(OFFSET) && !(type instanceof ArrowType.Union)) { + /* offset buffer has 1 additional value capacity except for dense unions */ innerBufferValueCount = valueCount + 1; } vectorBuffers[v] = readIntoBuffer(allocator, bufferType, vector.getMinorType(), innerBufferValueCount); } - if (vectorBuffers.length == 0) { - readToken(END_OBJECT); - return; - } - int nullCount = 0; - if (!(vector.getField().getFieldType().getType() instanceof ArrowType.Union)) { + if (type instanceof ArrowType.Null) { + nullCount = valueCount; + } else if (!(type instanceof ArrowType.Union)) { nullCount = BitVectorHelper.getNullCount(vectorBuffers[0], valueCount); } final ArrowFieldNode fieldNode = new ArrowFieldNode(valueCount, nullCount); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index 83a8ece0bfb06..f81d049a9257f 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -112,8 +112,8 @@ public ArrowRecordBatch( } long size = arrowBuf.readableBytes(); arrowBuffers.add(new ArrowBuffer(offset, size)); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Buffer in RecordBatch at {}, length: {}", offset, size); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Buffer in RecordBatch at {}, length: {}", offset, size); } offset += size; if (alignBuffers) { // align on 8 byte boundaries diff --git a/java/vector/src/main/java/org/apache/arrow/vector/util/Validator.java b/java/vector/src/main/java/org/apache/arrow/vector/util/Validator.java index 741972b4ad2a8..0c9ad1e2753f1 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/util/Validator.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/util/Validator.java @@ -17,6 +17,7 @@ package org.apache.arrow.vector.util; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -85,6 +86,31 @@ public static void compareDictionaries( } } + /** + * Validate two dictionary providers are equal in structure and contents. + */ + public static void compareDictionaryProviders( + DictionaryProvider provider1, + DictionaryProvider provider2) { + List ids1 = new ArrayList(provider1.getDictionaryIds()); + List ids2 = new ArrayList(provider2.getDictionaryIds()); + java.util.Collections.sort(ids1); + java.util.Collections.sort(ids2); + if (!ids1.equals(ids2)) { + throw new IllegalArgumentException("Different ids in dictionary providers:\n" + + ids1 + "\n" + ids2); + } + for (long id : ids1) { + Dictionary dict1 = provider1.lookup(id); + Dictionary dict2 = provider2.lookup(id); + try { + compareFieldVectors(dict1.getVector(), dict2.getVector()); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Different dictionaries:\n" + dict1 + "\n" + dict2, e); + } + } + } + /** * Validate two arrow vectorSchemaRoot are equal. *