Spark, Flink: Add null engineSchema fallback for format model writers#15688
Spark, Flink: Add null engineSchema fallback for format model writers#15688pvary merged 8 commits intoapache:mainfrom
Conversation
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
Outdated
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
Outdated
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
Outdated
Show resolved
Hide resolved
|
Update the tests to remove the unnecessary engineSchema settings |
Removing |
… for FlinkParquetWriters.
| } | ||
|
|
||
| public SparkAvroWriter(org.apache.iceberg.Schema icebergSchema, StructType engineSchema) { | ||
| this(engineSchema != null ? engineSchema : SparkSchemaUtil.convert(icebergSchema)); |
There was a problem hiding this comment.
Do we have tests to cover both the null and not null path?
There was a problem hiding this comment.
Thank you for the review @huaxingao.
I have added two new tests in BaseFormatModelTests that cover the null path based on your and @pvary's suggestions, testDataWriterEngineWriteWithoutEngineSchema and testEqualityDeleteWriterEngineWriteWithoutEngineSchema. The existing tests with explicit engineSchema are also present.
We can update all of the Spark and Flink versions in one PR, and then we can have a test where it is removed. Also, reflecting on @huaxingao’s comment, I think we should keep the other tests as well. |
Thank you Péter. I have applied the fallback to all Spark and Flink versions. I have also added new tests for the null engineSchema path while keeping existing tests that set it explicitly. |
data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
Outdated
Show resolved
Hide resolved
| // Read back and verify | ||
| InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); | ||
| List<Record> readRecords; | ||
| try (CloseableIterable<Record> reader = | ||
| FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile) | ||
| .project(schema) | ||
| .build()) { | ||
| readRecords = ImmutableList.copyOf(reader); | ||
| } | ||
|
|
||
| DataTestHelpers.assertEquals(schema.asStruct(), genericRecords, readRecords); |
There was a problem hiding this comment.
Could we generalize this to a method?
I think @Guosmilesmile already did this in his #15633 PR
There was a problem hiding this comment.
I have extracted readAndAssertGenericRecords helper. Inspired by @Guosmilesmile's writeGenericRecords pattern in #15633.
…s.java Co-authored-by: pvary <[email protected]>
…s.java Co-authored-by: pvary <[email protected]>
|
Merged to main. |
When
engineSchemais not set, format model writers now derive it from the Iceberg schema instead of failing with a null error.Discussed in Add TCK for File Format API.
Spark (4.1, 4.0, 3.5, 3.4): Added
SparkAvroWriterconstructor with null fallback. UpdatedSparkFormatModels. Parquet already had this, ORC ignoresengineSchema.Flink (2.1, 2.0, 1.20): Added null fallback in
FlinkAvroWriter,FlinkParquetWriters, andFlinkOrcWriter. UpdatedFlinkFormatModels.Tests: Added
testDataWriterEngineWriteWithoutEngineSchemaandtestEqualityDeleteWriterEngineWriteWithoutEngineSchemainBaseFormatModelTeststo cover the nullengineSchemapath. Existing tests with explicitengineSchemaare kept.Part of #15415