-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Description
Apache Iceberg version
1.10.0 (latest release)
Query engine
Flink
Please describe the bug 🐞
Hi all,
Have 2 Flink (v1.20.3) jobs - one is writing records to an Iceberg table with org.apache.iceberg.flink.sink.FlinkSink, and another streaming incremental changes from the table with org.apache.iceberg.flink.source.IcebergSource. This was working fine.
I then changed FlinkSink to org.apache.iceberg.flink.sink.dynamic.DynamicIcebergSink. The writes to the Iceberg table are still working fine - however now the incremental reads do not get picked up by the IcebergSource in the second Flink job. The IcebergSource recognises the new snapshots, but says: "Discovered 0 splits from incremental scan". When I query the difference between the snapshots with Athena however, there definitely are incremental records there.
I couldn't find any documentation on things to consider when moving to DynamicIcebergSink - is there something that I'm missing here?
My sink is defined like this:
DynamicIcebergSink.Builder<GenericRecord> flinkSinkBuilder =
DynamicIcebergSink.forInput(dataStream).generator((inputRecord, out) -> {
var record = new DynamicRecord(TableIdentifier.of(s3_table_db, s3_table_name),
"main", icebergSchema,
avroGenericRecordToRowDataMapper.map(inputRecord), partitionSpec,
partitionSpec.isPartitioned() ? DistributionMode.HASH
: DistributionMode.NONE,
2);
record.setUpsertMode(false);
out.collect(record);
}).catalogLoader(icebergCatalogLoader).immediateTableUpdate(true)
.set("write.upsert.enabled", "false");Thank you.
Willingness to contribute
- I can contribute a fix for this bug independently
- I would be willing to contribute a fix for this bug with guidance from the Iceberg community
- I cannot contribute a fix for this bug at this time