From 60307829a83da9d4dd5fe4df87fe95e47d4fd665 Mon Sep 17 00:00:00 2001 From: Ilia Chibaev Date: Tue, 11 Nov 2025 11:09:50 +0100 Subject: [PATCH 1/4] Flink: add append capability to dynamic iceberg sink --- .../flink/sink/dynamic/DynamicCommitter.java | 71 +++++++++++++------ .../flink/sink/dynamic/DynamicCommitter.java | 71 +++++++++++++------ .../flink/sink/dynamic/DynamicCommitter.java | 71 +++++++++++++------ 3 files changed, 150 insertions(+), 63 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 54d506b66328..447c8a7c00e9 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -31,6 +31,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.iceberg.AppendFiles; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.RowDelta; @@ -302,30 +303,58 @@ private void commitDeltaTxn( CommitSummary summary, String newFlinkJobId, String operatorId) { - for (Map.Entry> e : pendingResults.entrySet()) { - long checkpointId = e.getKey(); - List writeResults = e.getValue(); - - RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); - for (WriteResult result : writeResults) { - // Row delta validations are not needed for streaming changes that write equality deletes. - // Equality deletes are applied to data in all previous sequence numbers, so retries may - // push deletes further in the future, but do not affect correctness. Position deletes - // committed to the table in this path are used only to delete rows from data files that are - // being added in this commit. There is no way for data files added along with the delete - // files to be concurrently removed, so there is no need to validate the files referenced by - // the position delete files that are being committed. - Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); - Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); + if (summary.deleteFilesCount() == 0) { + // To be compatible with iceberg format V1. + AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool); + for (List resultList : pendingResults.values()) { + for (WriteResult result : resultList) { + Preconditions.checkState( + result.referencedDataFiles().length == 0, + "Should have no referenced data files for append."); + Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); + } } + String description = "append"; - // Every Flink checkpoint contains a set of independent changes which can be committed - // together. While it is technically feasible to combine append-only data across checkpoints, - // for the sake of simplicity, we do not implement this (premature) optimization. Multiple - // pending checkpoints here are very rare to occur, i.e. only with very short checkpoint - // intervals or when concurrent checkpointing is enabled. + // fail all commits as really its only one commitOperation( - table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId); + table, + branch, + appendFiles, + summary, + description, + newFlinkJobId, + operatorId, + pendingResults.lastKey()); + } else { + for (Map.Entry> e : pendingResults.entrySet()) { + long checkpointId = e.getKey(); + List writeResults = e.getValue(); + + RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); + for (WriteResult result : writeResults) { + // Row delta validations are not needed for streaming changes that write equality deletes. + // Equality deletes are applied to data in all previous sequence numbers, so retries may + // push deletes further in the future, but do not affect correctness. Position deletes + // committed to the table in this path are used only to delete rows from data files that + // are + // being added in this commit. There is no way for data files added along with the delete + // files to be concurrently removed, so there is no need to validate the files referenced + // by + // the position delete files that are being committed. + Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); + Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); + } + + // Every Flink checkpoint contains a set of independent changes which can be committed + // together. While it is technically feasible to combine append-only data across + // checkpoints, + // for the sake of simplicity, we do not implement this (premature) optimization. Multiple + // pending checkpoints here are very rare to occur, i.e. only with very short checkpoint + // intervals or when concurrent checkpointing is enabled. + commitOperation( + table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId); + } } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 54d506b66328..447c8a7c00e9 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -31,6 +31,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.iceberg.AppendFiles; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.RowDelta; @@ -302,30 +303,58 @@ private void commitDeltaTxn( CommitSummary summary, String newFlinkJobId, String operatorId) { - for (Map.Entry> e : pendingResults.entrySet()) { - long checkpointId = e.getKey(); - List writeResults = e.getValue(); - - RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); - for (WriteResult result : writeResults) { - // Row delta validations are not needed for streaming changes that write equality deletes. - // Equality deletes are applied to data in all previous sequence numbers, so retries may - // push deletes further in the future, but do not affect correctness. Position deletes - // committed to the table in this path are used only to delete rows from data files that are - // being added in this commit. There is no way for data files added along with the delete - // files to be concurrently removed, so there is no need to validate the files referenced by - // the position delete files that are being committed. - Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); - Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); + if (summary.deleteFilesCount() == 0) { + // To be compatible with iceberg format V1. + AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool); + for (List resultList : pendingResults.values()) { + for (WriteResult result : resultList) { + Preconditions.checkState( + result.referencedDataFiles().length == 0, + "Should have no referenced data files for append."); + Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); + } } + String description = "append"; - // Every Flink checkpoint contains a set of independent changes which can be committed - // together. While it is technically feasible to combine append-only data across checkpoints, - // for the sake of simplicity, we do not implement this (premature) optimization. Multiple - // pending checkpoints here are very rare to occur, i.e. only with very short checkpoint - // intervals or when concurrent checkpointing is enabled. + // fail all commits as really its only one commitOperation( - table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId); + table, + branch, + appendFiles, + summary, + description, + newFlinkJobId, + operatorId, + pendingResults.lastKey()); + } else { + for (Map.Entry> e : pendingResults.entrySet()) { + long checkpointId = e.getKey(); + List writeResults = e.getValue(); + + RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); + for (WriteResult result : writeResults) { + // Row delta validations are not needed for streaming changes that write equality deletes. + // Equality deletes are applied to data in all previous sequence numbers, so retries may + // push deletes further in the future, but do not affect correctness. Position deletes + // committed to the table in this path are used only to delete rows from data files that + // are + // being added in this commit. There is no way for data files added along with the delete + // files to be concurrently removed, so there is no need to validate the files referenced + // by + // the position delete files that are being committed. + Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); + Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); + } + + // Every Flink checkpoint contains a set of independent changes which can be committed + // together. While it is technically feasible to combine append-only data across + // checkpoints, + // for the sake of simplicity, we do not implement this (premature) optimization. Multiple + // pending checkpoints here are very rare to occur, i.e. only with very short checkpoint + // intervals or when concurrent checkpointing is enabled. + commitOperation( + table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId); + } } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 54d506b66328..447c8a7c00e9 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -31,6 +31,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.iceberg.AppendFiles; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.RowDelta; @@ -302,30 +303,58 @@ private void commitDeltaTxn( CommitSummary summary, String newFlinkJobId, String operatorId) { - for (Map.Entry> e : pendingResults.entrySet()) { - long checkpointId = e.getKey(); - List writeResults = e.getValue(); - - RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); - for (WriteResult result : writeResults) { - // Row delta validations are not needed for streaming changes that write equality deletes. - // Equality deletes are applied to data in all previous sequence numbers, so retries may - // push deletes further in the future, but do not affect correctness. Position deletes - // committed to the table in this path are used only to delete rows from data files that are - // being added in this commit. There is no way for data files added along with the delete - // files to be concurrently removed, so there is no need to validate the files referenced by - // the position delete files that are being committed. - Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); - Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); + if (summary.deleteFilesCount() == 0) { + // To be compatible with iceberg format V1. + AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool); + for (List resultList : pendingResults.values()) { + for (WriteResult result : resultList) { + Preconditions.checkState( + result.referencedDataFiles().length == 0, + "Should have no referenced data files for append."); + Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); + } } + String description = "append"; - // Every Flink checkpoint contains a set of independent changes which can be committed - // together. While it is technically feasible to combine append-only data across checkpoints, - // for the sake of simplicity, we do not implement this (premature) optimization. Multiple - // pending checkpoints here are very rare to occur, i.e. only with very short checkpoint - // intervals or when concurrent checkpointing is enabled. + // fail all commits as really its only one commitOperation( - table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId); + table, + branch, + appendFiles, + summary, + description, + newFlinkJobId, + operatorId, + pendingResults.lastKey()); + } else { + for (Map.Entry> e : pendingResults.entrySet()) { + long checkpointId = e.getKey(); + List writeResults = e.getValue(); + + RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); + for (WriteResult result : writeResults) { + // Row delta validations are not needed for streaming changes that write equality deletes. + // Equality deletes are applied to data in all previous sequence numbers, so retries may + // push deletes further in the future, but do not affect correctness. Position deletes + // committed to the table in this path are used only to delete rows from data files that + // are + // being added in this commit. There is no way for data files added along with the delete + // files to be concurrently removed, so there is no need to validate the files referenced + // by + // the position delete files that are being committed. + Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); + Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); + } + + // Every Flink checkpoint contains a set of independent changes which can be committed + // together. While it is technically feasible to combine append-only data across + // checkpoints, + // for the sake of simplicity, we do not implement this (premature) optimization. Multiple + // pending checkpoints here are very rare to occur, i.e. only with very short checkpoint + // intervals or when concurrent checkpointing is enabled. + commitOperation( + table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId); + } } } From 0621c1b2b257c655a4bb523d1518de4f8d9a3132 Mon Sep 17 00:00:00 2001 From: Ilia Chibaev Date: Tue, 11 Nov 2025 11:42:32 +0100 Subject: [PATCH 2/4] revert changes to earlier versions --- .../flink/sink/dynamic/DynamicCommitter.java | 71 ++++++------------- .../flink/sink/dynamic/DynamicCommitter.java | 71 ++++++------------- 2 files changed, 42 insertions(+), 100 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 447c8a7c00e9..54d506b66328 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -31,7 +31,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.core.io.SimpleVersionedSerialization; -import org.apache.iceberg.AppendFiles; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.RowDelta; @@ -303,58 +302,30 @@ private void commitDeltaTxn( CommitSummary summary, String newFlinkJobId, String operatorId) { - if (summary.deleteFilesCount() == 0) { - // To be compatible with iceberg format V1. - AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool); - for (List resultList : pendingResults.values()) { - for (WriteResult result : resultList) { - Preconditions.checkState( - result.referencedDataFiles().length == 0, - "Should have no referenced data files for append."); - Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); - } + for (Map.Entry> e : pendingResults.entrySet()) { + long checkpointId = e.getKey(); + List writeResults = e.getValue(); + + RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); + for (WriteResult result : writeResults) { + // Row delta validations are not needed for streaming changes that write equality deletes. + // Equality deletes are applied to data in all previous sequence numbers, so retries may + // push deletes further in the future, but do not affect correctness. Position deletes + // committed to the table in this path are used only to delete rows from data files that are + // being added in this commit. There is no way for data files added along with the delete + // files to be concurrently removed, so there is no need to validate the files referenced by + // the position delete files that are being committed. + Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); + Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); } - String description = "append"; - // fail all commits as really its only one + // Every Flink checkpoint contains a set of independent changes which can be committed + // together. While it is technically feasible to combine append-only data across checkpoints, + // for the sake of simplicity, we do not implement this (premature) optimization. Multiple + // pending checkpoints here are very rare to occur, i.e. only with very short checkpoint + // intervals or when concurrent checkpointing is enabled. commitOperation( - table, - branch, - appendFiles, - summary, - description, - newFlinkJobId, - operatorId, - pendingResults.lastKey()); - } else { - for (Map.Entry> e : pendingResults.entrySet()) { - long checkpointId = e.getKey(); - List writeResults = e.getValue(); - - RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); - for (WriteResult result : writeResults) { - // Row delta validations are not needed for streaming changes that write equality deletes. - // Equality deletes are applied to data in all previous sequence numbers, so retries may - // push deletes further in the future, but do not affect correctness. Position deletes - // committed to the table in this path are used only to delete rows from data files that - // are - // being added in this commit. There is no way for data files added along with the delete - // files to be concurrently removed, so there is no need to validate the files referenced - // by - // the position delete files that are being committed. - Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); - Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); - } - - // Every Flink checkpoint contains a set of independent changes which can be committed - // together. While it is technically feasible to combine append-only data across - // checkpoints, - // for the sake of simplicity, we do not implement this (premature) optimization. Multiple - // pending checkpoints here are very rare to occur, i.e. only with very short checkpoint - // intervals or when concurrent checkpointing is enabled. - commitOperation( - table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId); - } + table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId); } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 447c8a7c00e9..54d506b66328 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -31,7 +31,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.core.io.SimpleVersionedSerialization; -import org.apache.iceberg.AppendFiles; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.RowDelta; @@ -303,58 +302,30 @@ private void commitDeltaTxn( CommitSummary summary, String newFlinkJobId, String operatorId) { - if (summary.deleteFilesCount() == 0) { - // To be compatible with iceberg format V1. - AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool); - for (List resultList : pendingResults.values()) { - for (WriteResult result : resultList) { - Preconditions.checkState( - result.referencedDataFiles().length == 0, - "Should have no referenced data files for append."); - Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); - } + for (Map.Entry> e : pendingResults.entrySet()) { + long checkpointId = e.getKey(); + List writeResults = e.getValue(); + + RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); + for (WriteResult result : writeResults) { + // Row delta validations are not needed for streaming changes that write equality deletes. + // Equality deletes are applied to data in all previous sequence numbers, so retries may + // push deletes further in the future, but do not affect correctness. Position deletes + // committed to the table in this path are used only to delete rows from data files that are + // being added in this commit. There is no way for data files added along with the delete + // files to be concurrently removed, so there is no need to validate the files referenced by + // the position delete files that are being committed. + Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); + Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); } - String description = "append"; - // fail all commits as really its only one + // Every Flink checkpoint contains a set of independent changes which can be committed + // together. While it is technically feasible to combine append-only data across checkpoints, + // for the sake of simplicity, we do not implement this (premature) optimization. Multiple + // pending checkpoints here are very rare to occur, i.e. only with very short checkpoint + // intervals or when concurrent checkpointing is enabled. commitOperation( - table, - branch, - appendFiles, - summary, - description, - newFlinkJobId, - operatorId, - pendingResults.lastKey()); - } else { - for (Map.Entry> e : pendingResults.entrySet()) { - long checkpointId = e.getKey(); - List writeResults = e.getValue(); - - RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); - for (WriteResult result : writeResults) { - // Row delta validations are not needed for streaming changes that write equality deletes. - // Equality deletes are applied to data in all previous sequence numbers, so retries may - // push deletes further in the future, but do not affect correctness. Position deletes - // committed to the table in this path are used only to delete rows from data files that - // are - // being added in this commit. There is no way for data files added along with the delete - // files to be concurrently removed, so there is no need to validate the files referenced - // by - // the position delete files that are being committed. - Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); - Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); - } - - // Every Flink checkpoint contains a set of independent changes which can be committed - // together. While it is technically feasible to combine append-only data across - // checkpoints, - // for the sake of simplicity, we do not implement this (premature) optimization. Multiple - // pending checkpoints here are very rare to occur, i.e. only with very short checkpoint - // intervals or when concurrent checkpointing is enabled. - commitOperation( - table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId); - } + table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId); } } From 3f6c92386211052bd975865ab6682fd4496bccf6 Mon Sep 17 00:00:00 2001 From: Ilia Chibaev Date: Tue, 11 Nov 2025 12:43:59 +0100 Subject: [PATCH 3/4] address PR comments --- .../apache/iceberg/flink/sink/dynamic/DynamicCommitter.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 447c8a7c00e9..36000b456863 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -304,7 +304,7 @@ private void commitDeltaTxn( String newFlinkJobId, String operatorId) { if (summary.deleteFilesCount() == 0) { - // To be compatible with iceberg format V1. + // Use append snapshot operation where possible AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool); for (List resultList : pendingResults.values()) { for (WriteResult result : resultList) { @@ -314,15 +314,13 @@ private void commitDeltaTxn( Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); } } - String description = "append"; - // fail all commits as really its only one commitOperation( table, branch, appendFiles, summary, - description, + "append", newFlinkJobId, operatorId, pendingResults.lastKey()); From 4a852ad692be0efa42a8fb7446a9085ac59e5c38 Mon Sep 17 00:00:00 2001 From: Ilia Chibaev Date: Tue, 11 Nov 2025 12:44:16 +0100 Subject: [PATCH 4/4] test append snapshot operation --- .../sink/dynamic/TestDynamicCommitter.java | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index 7894428a781f..218cc406a733 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -591,6 +591,71 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { .build()); } + @Test + void testCommitDeltaTxnWithAppendFiles() throws Exception { + Table table = catalog.loadTable(TableIdentifier.of(TABLE1)); + assertThat(table.snapshots()).isEmpty(); + + DynamicWriteResultAggregator aggregator = + new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); + OneInputStreamOperatorTestHarness aggregatorHarness = + new OneInputStreamOperatorTestHarness(aggregator); + aggregatorHarness.open(); + + WriteTarget writeTarget1 = + new WriteTarget(TABLE1, "branch1", 42, 0, true, Sets.newHashSet(1, 2)); + WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch1", 23, 0, true, Sets.newHashSet()); + + WriteResult writeResult1 = WriteResult.builder().addDataFiles(DATA_FILE).build(); + WriteResult writeResult2 = WriteResult.builder().addDataFiles(DATA_FILE_2).build(); + + final String jobId = JobID.generate().toHexString(); + final String operatorId = new OperatorID().toHexString(); + final int checkpointId = 1; + + byte[] deltaManifest1 = + aggregator.writeToManifest( + writeTarget1, + Sets.newHashSet(new DynamicWriteResult(writeTarget1, writeResult1)), + checkpointId); + + CommitRequest commitRequest1 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId)); + + byte[] deltaManifest2 = + aggregator.writeToManifest( + writeTarget2, + Sets.newHashSet(new DynamicWriteResult(writeTarget2, writeResult2)), + checkpointId); + + CommitRequest commitRequest2 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId)); + + boolean overwriteMode = false; + int workerPoolSize = 1; + String sinkId = "sinkId"; + UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup(); + DynamicCommitterMetrics committerMetrics = new DynamicCommitterMetrics(metricGroup); + DynamicCommitter dynamicCommitter = + new DynamicCommitter( + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics); + + dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2)); + + table.refresh(); + assertThat(table.snapshots()).hasSize(1); + + Snapshot snapshot = Iterables.getFirst(table.snapshots(), null); + assertThat(snapshot.operation()).isEqualTo("append"); + } + @Test void testReplacePartitions() throws Exception { Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));