Skip to content

Commit 33aa54e

Browse files
committed
WIP
1 parent 0793432 commit 33aa54e

File tree

7 files changed

+217
-32
lines changed

7 files changed

+217
-32
lines changed

ci/scripts/integration_arrow.sh

+8-7
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,23 @@ arrow_dir=${1}
2323
gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration
2424

2525
pip install -e $arrow_dir/dev/archery[integration]
26-
# For C# C Data Interface testing
27-
pip install pythonnet
26+
# For C Data Interface testing
27+
pip install jpype1 pythonnet
2828

2929
# Get more detailed context on crashes
3030
export PYTHONFAULTHANDLER=1
3131

32+
# --run-ipc \
33+
# --run-flight \
34+
3235
# Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1
3336
time archery integration \
3437
--run-c-data \
35-
--run-ipc \
36-
--run-flight \
3738
--with-cpp=1 \
38-
--with-csharp=1 \
39+
--with-csharp=0 \
3940
--with-java=1 \
40-
--with-js=1 \
41-
--with-go=1 \
41+
--with-js=0 \
42+
--with-go=0 \
4243
--gold-dirs=$gold_dir/0.14.1 \
4344
--gold-dirs=$gold_dir/0.17.1 \
4445
--gold-dirs=$gold_dir/1.0.0-bigendian \

dev/archery/archery/integration/datagen.py

+11-12
Original file line numberDiff line numberDiff line change
@@ -1700,9 +1700,9 @@ def generate_unions_case():
17001700

17011701

17021702
def generate_dictionary_case():
1703-
dict0 = Dictionary(0, StringField('dictionary1'), size=10, name='DICT0')
1704-
dict1 = Dictionary(1, StringField('dictionary1'), size=5, name='DICT1')
1705-
dict2 = Dictionary(2, get_field('dictionary2', 'int64'),
1703+
dict0 = Dictionary(1, StringField('dictionary1'), size=10, name='DICT0')
1704+
dict1 = Dictionary(2, StringField('dictionary1'), size=5, name='DICT1')
1705+
dict2 = Dictionary(3, get_field('dictionary2', 'int64'),
17061706
size=50, name='DICT2')
17071707

17081708
fields = [
@@ -1716,14 +1716,13 @@ def generate_dictionary_case():
17161716

17171717

17181718
def generate_dictionary_unsigned_case():
1719-
dict0 = Dictionary(0, StringField('dictionary0'), size=5, name='DICT0')
1720-
dict1 = Dictionary(1, StringField('dictionary1'), size=5, name='DICT1')
1721-
dict2 = Dictionary(2, StringField('dictionary2'), size=5, name='DICT2')
1719+
dict0 = Dictionary(1, StringField('dictionary0'), size=5, name='DICT0')
1720+
dict1 = Dictionary(2, StringField('dictionary1'), size=5, name='DICT1')
1721+
dict2 = Dictionary(3, StringField('dictionary2'), size=5, name='DICT2')
17221722

17231723
# TODO: JavaScript does not support uint64 dictionary indices, so disabled
17241724
# for now
1725-
1726-
# dict3 = Dictionary(3, StringField('dictionary3'), size=5, name='DICT3')
1725+
# dict3 = Dictionary(4, StringField('dictionary3'), size=5, name='DICT3')
17271726
fields = [
17281727
DictionaryField('f0', get_field('', 'uint8'), dict0),
17291728
DictionaryField('f1', get_field('', 'uint16'), dict1),
@@ -1736,18 +1735,18 @@ def generate_dictionary_unsigned_case():
17361735

17371736

17381737
def generate_nested_dictionary_case():
1739-
dict0 = Dictionary(0, StringField('str'), size=10, name='DICT0')
1738+
dict0 = Dictionary(1, StringField('str'), size=10, name='DICT0')
17401739

17411740
list_of_dict = ListField(
17421741
'list',
17431742
DictionaryField('str_dict', get_field('', 'int8'), dict0))
1744-
dict1 = Dictionary(1, list_of_dict, size=30, name='DICT1')
1743+
dict1 = Dictionary(2, list_of_dict, size=30, name='DICT1')
17451744

17461745
struct_of_dict = StructField('struct', [
17471746
DictionaryField('str_dict_a', get_field('', 'int8'), dict0),
17481747
DictionaryField('str_dict_b', get_field('', 'int8'), dict0)
17491748
])
1750-
dict2 = Dictionary(2, struct_of_dict, size=30, name='DICT2')
1749+
dict2 = Dictionary(3, struct_of_dict, size=30, name='DICT2')
17511750

17521751
fields = [
17531752
DictionaryField('list_dict', get_field('', 'int8'), dict1),
@@ -1760,7 +1759,7 @@ def generate_nested_dictionary_case():
17601759

17611760

17621761
def generate_extension_case():
1763-
dict0 = Dictionary(0, StringField('dictionary0'), size=5, name='DICT0')
1762+
dict0 = Dictionary(1, StringField('dictionary0'), size=5, name='DICT0')
17641763

17651764
uuid_type = ExtensionType('uuid', 'uuid-serialized',
17661765
FixedSizeBinaryField('', 16))

dev/archery/archery/integration/tester_java.py

+163-8
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
# under the License.
1717

1818
import contextlib
19+
import functools
1920
import os
2021
import subprocess
2122

22-
from .tester import Tester
23+
from . import cdata
24+
from .tester import Tester, CDataExporter, CDataImporter
2325
from .util import run_cmd, log
2426
from ..utils.source import ARROW_ROOT_DEFAULT
2527

@@ -42,18 +44,25 @@ def load_version_from_pom():
4244
"ARROW_JAVA_INTEGRATION_JAR",
4345
os.path.join(
4446
ARROW_ROOT_DEFAULT,
45-
"java/tools/target/arrow-tools-{}-"
46-
"jar-with-dependencies.jar".format(_arrow_version),
47-
),
47+
"java/tools/target",
48+
f"arrow-tools-{_arrow_version}-jar-with-dependencies.jar"
49+
)
50+
)
51+
_ARROW_C_DATA_JAR = os.environ.get(
52+
"ARROW_C_DATA_JAVA_INTEGRATION_JAR",
53+
os.path.join(
54+
ARROW_ROOT_DEFAULT,
55+
"java/c/target",
56+
f"arrow-c-data-{_arrow_version}.jar"
57+
)
4858
)
4959
_ARROW_FLIGHT_JAR = os.environ.get(
5060
"ARROW_FLIGHT_JAVA_INTEGRATION_JAR",
5161
os.path.join(
5262
ARROW_ROOT_DEFAULT,
53-
"java/flight/flight-integration-tests/target/"
54-
"flight-integration-tests-{}-jar-with-dependencies.jar".format(
55-
_arrow_version),
56-
),
63+
"java/flight/flight-integration-tests/target",
64+
f"flight-integration-tests-{_arrow_version}-jar-with-dependencies.jar"
65+
)
5766
)
5867
_ARROW_FLIGHT_SERVER = (
5968
"org.apache.arrow.flight.integration.tests.IntegrationTestServer"
@@ -63,11 +72,151 @@ def load_version_from_pom():
6372
)
6473

6574

75+
@functools.lru_cache
76+
def setup_jpype():
77+
import jpype
78+
jar_path = f"{_ARROW_TOOLS_JAR}:{_ARROW_C_DATA_JAR}"
79+
# XXX Didn't manage to tone down the logging level here (DEBUG -> INFO)
80+
jpype.startJVM(jpype.getDefaultJVMPath(),
81+
"-Djava.class.path=" + jar_path)
82+
83+
84+
class _CDataBase:
85+
86+
def __init__(self, debug, args):
87+
import jpype
88+
self.debug = debug
89+
self.args = args
90+
self.ffi = cdata.ffi()
91+
setup_jpype()
92+
# JPype pointers to java.io, org.apache.arrow...
93+
self.java_io = jpype.JPackage("java").io
94+
self.java_arrow = jpype.JPackage("org").apache.arrow
95+
self.java_allocator = self._make_java_allocator()
96+
97+
def _pointer_to_int(self, c_ptr):
98+
return int(self.ffi.cast('uintptr_t', c_ptr))
99+
100+
def _wrap_c_schema_ptr(self, c_schema_ptr):
101+
return self.java_arrow.c.ArrowSchema.wrap(
102+
self._pointer_to_int(c_schema_ptr))
103+
104+
def _wrap_c_array_ptr(self, c_array_ptr):
105+
return self.java_arrow.c.ArrowArray.wrap(
106+
self._pointer_to_int(c_array_ptr))
107+
108+
def _make_java_allocator(self):
109+
# Return a new allocator
110+
return self.java_arrow.memory.RootAllocator()
111+
112+
def _assert_schemas_equal(self, expected, actual):
113+
# XXX This is fragile for dictionaries, as Schema.equals compares
114+
# dictionary ids!
115+
# Should perhaps instead add a logical comparison function in
116+
# org.apache.arrow.vector.util.DictionaryUtil
117+
if not expected.equals(actual):
118+
raise AssertionError(
119+
f"Java Schemas are not equal:\n"
120+
f"* expected = {expected.toString()}\n"
121+
f"* actual = {actual.toString()}")
122+
123+
124+
class JavaCDataExporter(CDataExporter, _CDataBase):
125+
126+
def export_schema_from_json(self, json_path, c_schema_ptr):
127+
json_file = self.java_io.File(json_path)
128+
with self.java_arrow.vector.ipc.JsonFileReader(
129+
json_file, self.java_allocator) as json_reader:
130+
schema = json_reader.start()
131+
dict_provider = json_reader
132+
self.java_arrow.c.Data.exportSchema(
133+
self.java_allocator, schema, dict_provider,
134+
self._wrap_c_schema_ptr(c_schema_ptr)
135+
)
136+
137+
def export_batch_from_json(self, json_path, num_batch, c_array_ptr):
138+
json_file = self.java_io.File(json_path)
139+
with self.java_arrow.vector.ipc.JsonFileReader(
140+
json_file, self.java_allocator) as json_reader:
141+
json_reader.start()
142+
if num_batch > 0:
143+
actually_skipped = json_reader.skip(num_batch)
144+
assert actually_skipped == num_batch
145+
with json_reader.read() as batch:
146+
dict_provider = json_reader
147+
self.java_arrow.c.Data.exportVectorSchemaRoot(
148+
self.java_allocator, batch, dict_provider,
149+
self._wrap_c_array_ptr(c_array_ptr))
150+
151+
@property
152+
def supports_releasing_memory(self):
153+
return True
154+
155+
def record_allocation_state(self):
156+
return self.java_allocator.getAllocatedMemory()
157+
158+
def compare_allocation_state(self, recorded, gc_until):
159+
def pred():
160+
return self.java_allocator.getAllocatedMemory() == recorded
161+
162+
return gc_until(pred)
163+
164+
165+
class JavaCDataImporter(CDataImporter, _CDataBase):
166+
167+
def import_schema_and_compare_to_json(self, json_path, c_schema_ptr):
168+
json_file = self.java_io.File(json_path)
169+
with self.java_arrow.vector.ipc.JsonFileReader(
170+
json_file, self.java_allocator) as json_reader:
171+
json_schema = json_reader.start()
172+
with self.java_arrow.c.CDataDictionaryProvider() as dict_provider:
173+
imported_schema = self.java_arrow.c.Data.importSchema(
174+
self.java_allocator,
175+
self._wrap_c_schema_ptr(c_schema_ptr),
176+
dict_provider)
177+
self._assert_schemas_equal(json_schema, imported_schema)
178+
179+
def import_batch_and_compare_to_json(self, json_path, num_batch,
180+
c_array_ptr):
181+
json_file = self.java_io.File(json_path)
182+
with self.java_arrow.vector.ipc.JsonFileReader(
183+
json_file, self.java_allocator) as json_reader:
184+
schema = json_reader.start()
185+
if num_batch > 0:
186+
actually_skipped = json_reader.skip(num_batch)
187+
assert actually_skipped == num_batch
188+
with (json_reader.read() as batch,
189+
self.java_arrow.vector.VectorSchemaRoot.create(
190+
schema, self.java_allocator) as imported_batch):
191+
# We need to pass a dict provider primed with dictionary ids
192+
# matching those in the schema, hence an empty
193+
# CDataDictionaryProvider would not work here!
194+
dict_provider = json_reader
195+
self.java_arrow.c.Data.importIntoVectorSchemaRoot(
196+
self.java_allocator,
197+
self._wrap_c_array_ptr(c_array_ptr),
198+
imported_batch, dict_provider)
199+
# TODO print nice error message if not equal
200+
assert imported_batch.equals(batch)
201+
202+
@property
203+
def supports_releasing_memory(self):
204+
return True
205+
206+
def gc_until(self, predicate):
207+
# No need to call the Java GC thanks to AutoCloseable (?)
208+
return predicate()
209+
210+
66211
class JavaTester(Tester):
67212
PRODUCER = True
68213
CONSUMER = True
69214
FLIGHT_SERVER = True
70215
FLIGHT_CLIENT = True
216+
C_DATA_SCHEMA_EXPORTER = True
217+
C_DATA_SCHEMA_IMPORTER = True
218+
C_DATA_ARRAY_EXPORTER = True
219+
C_DATA_ARRAY_IMPORTER = True
71220

72221
name = 'Java'
73222

@@ -186,3 +335,9 @@ def flight_server(self, scenario_name=None):
186335
finally:
187336
server.kill()
188337
server.wait(5)
338+
339+
def make_c_data_exporter(self):
340+
return JavaCDataExporter(self.debug, self.args)
341+
342+
def make_c_data_importer(self):
343+
return JavaCDataImporter(self.debug, self.args)

docker-compose.yml

+8-3
Original file line numberDiff line numberDiff line change
@@ -1730,16 +1730,21 @@ services:
17301730
volumes: *conda-volumes
17311731
environment:
17321732
<<: [*common, *ccache]
1733-
# tell archery where the arrow binaries are located
1733+
ARCHERY_INTEGRATION_WITH_RUST: 0
1734+
# Tell Archery where the arrow C++ binaries are located
17341735
ARROW_CPP_EXE_PATH: /build/cpp/debug
17351736
ARROW_GO_INTEGRATION: 1
1736-
ARCHERY_INTEGRATION_WITH_RUST: 0
1737+
ARROW_JAVA_CDATA: "ON"
1738+
JAVA_JNI_CMAKE_ARGS: >-
1739+
-DARROW_JAVA_JNI_ENABLE_DEFAULT=OFF
1740+
-DARROW_JAVA_JNI_ENABLE_C=ON
17371741
command:
17381742
["/arrow/ci/scripts/rust_build.sh /arrow /build &&
17391743
/arrow/ci/scripts/cpp_build.sh /arrow /build &&
17401744
/arrow/ci/scripts/csharp_build.sh /arrow /build &&
17411745
/arrow/ci/scripts/go_build.sh /arrow &&
1742-
/arrow/ci/scripts/java_build.sh /arrow /build &&
1746+
/arrow/ci/scripts/java_jni_build.sh /arrow $${ARROW_HOME} /build /tmp/dist/java/$$(arch) &&
1747+
/arrow/ci/scripts/java_build.sh /arrow /build /tmp/dist/java &&
17431748
/arrow/ci/scripts/js_build.sh /arrow /build &&
17441749
/arrow/ci/scripts/integration_arrow.sh /arrow /build"]
17451750

java/c/src/main/java/org/apache/arrow/c/Format.java

+4
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ static String asString(ArrowType arrowType) {
138138
return "tiD";
139139
case YEAR_MONTH:
140140
return "tiM";
141+
case MONTH_DAY_NANO:
142+
return "tin";
141143
default:
142144
throw new UnsupportedOperationException(
143145
String.format("Interval type with unit %s is unsupported", type.getUnit()));
@@ -277,6 +279,8 @@ static ArrowType asType(String format, long flags)
277279
return new ArrowType.Interval(IntervalUnit.YEAR_MONTH);
278280
case "tiD":
279281
return new ArrowType.Interval(IntervalUnit.DAY_TIME);
282+
case "tin":
283+
return new ArrowType.Interval(IntervalUnit.MONTH_DAY_NANO);
280284
case "+l":
281285
return new ArrowType.List();
282286
case "+L":

java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java

+21
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,27 @@ public VectorSchemaRoot read() throws IOException {
237237
}
238238
}
239239

240+
/**
241+
* Skips a number of record batches in the file.
242+
*
243+
* @param numBatches the number of batches to skip
244+
* @return the actual number of skipped batches.
245+
*/
246+
public int skip(int numBatches) throws IOException {
247+
for (int i = 0; i < numBatches; ++i) {
248+
JsonToken t = parser.nextToken();
249+
if (t == START_OBJECT) {
250+
parser.skipChildren();
251+
assert parser.getCurrentToken() == END_OBJECT;
252+
} else if (t == END_ARRAY) {
253+
return i;
254+
} else {
255+
throw new IllegalArgumentException("Invalid token: " + t);
256+
}
257+
}
258+
return numBatches;
259+
}
260+
240261
private abstract class BufferReader {
241262
protected abstract ArrowBuf read(BufferAllocator allocator, int count) throws IOException;
242263

java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ public ArrowRecordBatch(
112112
}
113113
long size = arrowBuf.readableBytes();
114114
arrowBuffers.add(new ArrowBuffer(offset, size));
115-
if (LOGGER.isDebugEnabled()) {
116-
LOGGER.debug("Buffer in RecordBatch at {}, length: {}", offset, size);
115+
if (LOGGER.isTraceEnabled()) {
116+
LOGGER.trace("Buffer in RecordBatch at {}, length: {}", offset, size);
117117
}
118118
offset += size;
119119
if (alignBuffers) { // align on 8 byte boundaries

0 commit comments

Comments
 (0)