Skip to content

Commit b51d5fb

Browse files
committed
responding to comments, adding test
1 parent 323aa5a commit b51d5fb

File tree

4 files changed

+145
-52
lines changed

4 files changed

+145
-52
lines changed

iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/SnapshotDiffApplier.java

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,10 @@ static SnapshotDiff create(
172172
// Categorize snapshots
173173
List<Snapshot> newStagedSnapshots =
174174
newSnapshots.stream()
175-
.filter(s -> s.summary().containsKey(SnapshotSummary.STAGED_WAP_ID_PROP))
175+
.filter(
176+
s ->
177+
s.summary() != null
178+
&& s.summary().containsKey(SnapshotSummary.STAGED_WAP_ID_PROP))
176179
.collect(Collectors.toList());
177180

178181
// Compute source IDs for cherry-pick operations
@@ -220,7 +223,10 @@ static SnapshotDiff create(
220223
// (includes both regular commits and cherry-pick result snapshots)
221224
List<Snapshot> newMainBranchSnapshots =
222225
newSnapshots.stream()
223-
.filter(s -> !s.summary().containsKey(SnapshotSummary.STAGED_WAP_ID_PROP))
226+
.filter(
227+
s ->
228+
s.summary() == null
229+
|| !s.summary().containsKey(SnapshotSummary.STAGED_WAP_ID_PROP))
224230
.collect(Collectors.toList());
225231

226232
// Compute appended count
@@ -373,33 +379,33 @@ TableMetadata apply() {
373379
*
374380
* <p>[1] Staged (WAP) snapshots - added without branch reference
375381
*
376-
* <p>[2] New main branch snapshots - added without branch reference (branch pointer set
377-
* below)
382+
* <p>[2] New main branch snapshots - added and moved to MAIN branch incrementally
383+
*
384+
* <p>[3] Cherry-picked snapshots - existing snapshots, final branch pointer set below
378385
*
379-
* <p>[3] Cherry-picked snapshots - existing snapshots, branch pointer set below
386+
* <p>We trust the client-provided order rather than sorting. Sequence numbers are
387+
* monotonically increasing along a branch's lineage (following parent pointers) for both
388+
* cherry-pick result snapshots and fast-forward snapshots. Iceberg's setBranchSnapshot()
389+
* validates sequence numbers, so we can rely on its built-in validation.
380390
*/
381-
// Add staged snapshots in sequence number order (ensures correct commit ordering)
382-
this.newStagedSnapshots.stream()
383-
.sorted(java.util.Comparator.comparingLong(Snapshot::sequenceNumber))
384-
.forEach(metadataBuilder::addSnapshot);
385-
386-
// Add new main branch snapshots in sequence number order (ensures correct commit ordering)
387-
List<Snapshot> sortedMainBranchSnapshots =
388-
this.newMainBranchSnapshots.stream()
389-
.sorted(java.util.Comparator.comparingLong(Snapshot::sequenceNumber))
390-
.collect(Collectors.toList());
391-
sortedMainBranchSnapshots.forEach(metadataBuilder::addSnapshot);
391+
// Add staged snapshots in client-provided order
392+
this.newStagedSnapshots.forEach(metadataBuilder::addSnapshot);
393+
394+
// Add new main branch snapshots and move MAIN pointer incrementally
395+
// This works for both:
396+
// - Regular commits: newly created snapshots
397+
// - Cherry-pick results: newly created snapshots with SOURCE_SNAPSHOT_ID_PROP
398+
for (Snapshot snapshot : this.newMainBranchSnapshots) {
399+
metadataBuilder.setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH);
400+
}
392401

393-
// Set branch pointer once using providedRefs (covers both new snapshots and cherry-pick)
394-
if (!this.providedRefs.isEmpty()) {
395-
long newSnapshotId = this.providedRefs.get(SnapshotRef.MAIN_BRANCH).snapshotId();
402+
// Set final branch pointer using providedRefs if present
403+
// This handles fast-forward for cherry-pick/WAP publish where we're moving the branch
404+
// to an existing snapshot
405+
SnapshotRef mainBranchRef = this.providedRefs.get(SnapshotRef.MAIN_BRANCH);
406+
if (mainBranchRef != null) {
407+
long newSnapshotId = mainBranchRef.snapshotId();
396408
metadataBuilder.setBranchSnapshot(newSnapshotId, SnapshotRef.MAIN_BRANCH);
397-
} else if (!sortedMainBranchSnapshots.isEmpty()) {
398-
// Auto-append to main: if no refs provided but there are new main branch snapshots,
399-
// set MAIN to the last snapshot (latest by sequence number due to sort above)
400-
Snapshot latestSnapshot =
401-
sortedMainBranchSnapshots.get(sortedMainBranchSnapshots.size() - 1);
402-
metadataBuilder.setBranchSnapshot(latestSnapshot.snapshotId(), SnapshotRef.MAIN_BRANCH);
403409
}
404410

405411
// Delete snapshots

iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/IcebergTestUtil.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,14 @@ private static List<Snapshot> loadSnapshots(String snapshotFile) throws IOExcept
4444
return SnapshotsUtil.parseSnapshots(null, data);
4545
}
4646

47-
public static Map<String, String> obtainSnapshotRefsFromSnapshot(Snapshot snapshot) {
47+
public static Map<String, String> createMainBranchRefPointingTo(Snapshot snapshot) {
4848
Map<String, String> snapshotRefs = new HashMap<>();
4949
SnapshotRef snapshotRef = SnapshotRef.branchBuilder(snapshot.snapshotId()).build();
5050
snapshotRefs.put(SnapshotRef.MAIN_BRANCH, SnapshotRefParser.toJson(snapshotRef));
5151
return snapshotRefs;
5252
}
5353

54-
public static Map<String, String> obtainSnapshotRefsFromSnapshot(
55-
Snapshot snapshot, String branch) {
54+
public static Map<String, String> createBranchRefPointingTo(Snapshot snapshot, String branch) {
5655
Map<String, String> snapshotRefs = new HashMap<>();
5756
SnapshotRef snapshotRef = SnapshotRef.branchBuilder(snapshot.snapshotId()).build();
5857
snapshotRefs.put(branch, SnapshotRefParser.toJson(snapshotRef));

iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ void testDoCommitAppendSnapshotsInitialVersion() throws IOException {
148148
properties.put(
149149
CatalogConstants.SNAPSHOTS_REFS_KEY,
150150
SnapshotsUtil.serializeMap(
151-
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(
151+
IcebergTestUtil.createMainBranchRefPointingTo(
152152
testSnapshots.get(testSnapshots.size() - 1))));
153153

154154
TableMetadata metadata = BASE_TABLE_METADATA.replaceProperties(properties);
@@ -193,7 +193,7 @@ void testDoCommitAppendSnapshotsExistingVersion() throws IOException {
193193
properties.put(
194194
CatalogConstants.SNAPSHOTS_REFS_KEY,
195195
SnapshotsUtil.serializeMap(
196-
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(
196+
IcebergTestUtil.createMainBranchRefPointingTo(
197197
testSnapshots.get(testSnapshots.size() - 1))));
198198
properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION);
199199

@@ -248,7 +248,7 @@ void testDoCommitAppendAndDeleteSnapshots() throws IOException {
248248
properties.put(
249249
CatalogConstants.SNAPSHOTS_REFS_KEY,
250250
SnapshotsUtil.serializeMap(
251-
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(
251+
IcebergTestUtil.createMainBranchRefPointingTo(
252252
newSnapshots.get(newSnapshots.size() - 1))));
253253
properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION);
254254

@@ -438,7 +438,7 @@ void testDoCommitDeleteSnapshots() throws IOException {
438438
properties.put(
439439
CatalogConstants.SNAPSHOTS_REFS_KEY,
440440
SnapshotsUtil.serializeMap(
441-
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(
441+
IcebergTestUtil.createMainBranchRefPointingTo(
442442
testSnapshots.get(testSnapshots.size() - 1))));
443443
properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION);
444444

@@ -640,7 +640,7 @@ void testDoCommitSnapshotsValidationThrowsException() throws IOException {
640640
properties.put(
641641
CatalogConstants.SNAPSHOTS_REFS_KEY,
642642
SnapshotsUtil.serializeMap(
643-
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(
643+
IcebergTestUtil.createMainBranchRefPointingTo(
644644
testSnapshots.get(1)))); // But main refs snapshot 1
645645
properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION);
646646
metadata = metadata.replaceProperties(properties);
@@ -727,7 +727,7 @@ void testDoCommitAppendStageOnlySnapshotsExistingVersion() throws IOException {
727727
properties.put(
728728
CatalogConstants.SNAPSHOTS_REFS_KEY,
729729
SnapshotsUtil.serializeMap(
730-
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(newSnapshots.get(0))));
730+
IcebergTestUtil.createMainBranchRefPointingTo(newSnapshots.get(0))));
731731
properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION);
732732

733733
TableMetadata metadata = base.replaceProperties(properties);
@@ -771,7 +771,7 @@ void testAppendSnapshotsWithOldSnapshots() throws IOException {
771771
properties.put(
772772
CatalogConstants.SNAPSHOTS_REFS_KEY,
773773
SnapshotsUtil.serializeMap(
774-
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(snapshots.get(snapshots.size() - 1))));
774+
IcebergTestUtil.createMainBranchRefPointingTo(snapshots.get(snapshots.size() - 1))));
775775

776776
TableMetadata newMetadata = baseMetadata.replaceProperties(properties);
777777

@@ -790,7 +790,7 @@ void testAppendSnapshotsWithOldSnapshots() throws IOException {
790790
propertiesWithFuture.put(
791791
CatalogConstants.SNAPSHOTS_REFS_KEY,
792792
SnapshotsUtil.serializeMap(
793-
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(snapshots.get(snapshots.size() - 1))));
793+
IcebergTestUtil.createMainBranchRefPointingTo(snapshots.get(snapshots.size() - 1))));
794794

795795
TableMetadata newMetadataWithFuture = baseMetadata.replaceProperties(propertiesWithFuture);
796796
openHouseInternalTableOperations.snapshotDiffApplier.applySnapshots(
@@ -823,7 +823,7 @@ void testDoCommitCherryPickSnapshotBaseUnchanged() throws IOException {
823823
properties.put(
824824
CatalogConstants.SNAPSHOTS_REFS_KEY,
825825
SnapshotsUtil.serializeMap(
826-
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(testWapSnapshots.get(0))));
826+
IcebergTestUtil.createMainBranchRefPointingTo(testWapSnapshots.get(0))));
827827
properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION);
828828

829829
TableMetadata metadata = base.replaceProperties(properties);
@@ -864,7 +864,7 @@ void testDoCommitCherryPickSnapshotBaseChanged() throws IOException {
864864
properties.put(
865865
CatalogConstants.SNAPSHOTS_REFS_KEY,
866866
SnapshotsUtil.serializeMap(
867-
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(
867+
IcebergTestUtil.createMainBranchRefPointingTo(
868868
testWapSnapshots.get(2)))); // new snapshot
869869
properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION);
870870

@@ -905,7 +905,7 @@ void testDoCommitCherryPickFirstSnapshot() throws IOException {
905905
properties.put(
906906
CatalogConstants.SNAPSHOTS_REFS_KEY,
907907
SnapshotsUtil.serializeMap(
908-
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(testWapSnapshots.get(0))));
908+
IcebergTestUtil.createMainBranchRefPointingTo(testWapSnapshots.get(0))));
909909
properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION);
910910

911911
TableMetadata metadata = base.replaceProperties(properties);
@@ -1475,7 +1475,7 @@ void testDeleteSnapshotWithNoReference() throws IOException {
14751475
properties.put(
14761476
CatalogConstants.SNAPSHOTS_REFS_KEY,
14771477
SnapshotsUtil.serializeMap(
1478-
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(referencedSnapshot)));
1478+
IcebergTestUtil.createMainBranchRefPointingTo(referencedSnapshot)));
14791479

14801480
TableMetadata newMetadata = baseMetadata.replaceProperties(properties);
14811481

@@ -1537,7 +1537,7 @@ void testDeleteEmptySnapshotList() throws IOException {
15371537
SnapshotsUtil.serializedSnapshots(baseMetadata.snapshots()));
15381538
properties.put(
15391539
CatalogConstants.SNAPSHOTS_REFS_KEY,
1540-
SnapshotsUtil.serializeMap(IcebergTestUtil.obtainSnapshotRefsFromSnapshot(lastSnapshot)));
1540+
SnapshotsUtil.serializeMap(IcebergTestUtil.createMainBranchRefPointingTo(lastSnapshot)));
15411541

15421542
TableMetadata newMetadata = baseMetadata.replaceProperties(properties);
15431543

@@ -1583,7 +1583,7 @@ void testDeleteNullSnapshotList() throws IOException {
15831583
SnapshotsUtil.serializedSnapshots(baseMetadata.snapshots()));
15841584
properties.put(
15851585
CatalogConstants.SNAPSHOTS_REFS_KEY,
1586-
SnapshotsUtil.serializeMap(IcebergTestUtil.obtainSnapshotRefsFromSnapshot(lastSnapshot)));
1586+
SnapshotsUtil.serializeMap(IcebergTestUtil.createMainBranchRefPointingTo(lastSnapshot)));
15871587

15881588
TableMetadata newMetadata = baseMetadata.replaceProperties(properties);
15891589

@@ -1633,7 +1633,7 @@ void testDeleteNonExistentSnapshot() throws IOException {
16331633
SnapshotsUtil.serializedSnapshots(baseMetadata.snapshots()));
16341634
properties.put(
16351635
CatalogConstants.SNAPSHOTS_REFS_KEY,
1636-
SnapshotsUtil.serializeMap(IcebergTestUtil.obtainSnapshotRefsFromSnapshot(lastSnapshot)));
1636+
SnapshotsUtil.serializeMap(IcebergTestUtil.createMainBranchRefPointingTo(lastSnapshot)));
16371637

16381638
TableMetadata newMetadata = baseMetadata.replaceProperties(properties);
16391639

@@ -1721,7 +1721,7 @@ void testDeleteSnapshotMetricsRecordedBranch() throws IOException {
17211721
properties.put(
17221722
CatalogConstants.SNAPSHOTS_REFS_KEY,
17231723
SnapshotsUtil.serializeMap(
1724-
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(referencedSnapshot)));
1724+
IcebergTestUtil.createMainBranchRefPointingTo(referencedSnapshot)));
17251725

17261726
TableMetadata newMetadata = baseMetadata.replaceProperties(properties);
17271727

@@ -1768,7 +1768,7 @@ void testDeleteSnapshotMetricsRecordedNonExistent() throws IOException {
17681768
SnapshotsUtil.serializedSnapshots(finalBaseMetadata.snapshots()));
17691769
properties.put(
17701770
CatalogConstants.SNAPSHOTS_REFS_KEY,
1771-
SnapshotsUtil.serializeMap(IcebergTestUtil.obtainSnapshotRefsFromSnapshot(lastSnapshot)));
1771+
SnapshotsUtil.serializeMap(IcebergTestUtil.createMainBranchRefPointingTo(lastSnapshot)));
17721772

17731773
TableMetadata newMetadata = finalBaseMetadata.replaceProperties(properties);
17741774

0 commit comments

Comments
 (0)