Skip to content

Commit 323aa5a

Browse files
committed
adding more tests, and fixing small bug
1 parent 71bebbe commit 323aa5a

File tree

2 files changed

+265
-10
lines changed

2 files changed

+265
-10
lines changed

iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/SnapshotDiffApplier.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public TableMetadata applySnapshots(
8585

8686
// Validate, apply, record metrics (in correct order)
8787
diff.validate();
88-
TableMetadata result = diff.applyTo();
88+
TableMetadata result = diff.apply();
8989
diff.recordMetrics();
9090
return result;
9191
}
@@ -365,7 +365,7 @@ private void validateDeletedSnapshotsNotReferenced() {
365365
}
366366
}
367367

368-
TableMetadata applyTo() {
368+
TableMetadata apply() {
369369
TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(this.providedMetadata);
370370

371371
/**
@@ -378,22 +378,28 @@ TableMetadata applyTo() {
378378
*
379379
* <p>[3] Cherry-picked snapshots - existing snapshots, branch pointer set below
380380
*/
381-
// Add staged snapshots in timestamp order (explicit ordering for consistency)
381+
// Add staged snapshots in sequence number order (ensures correct commit ordering)
382382
this.newStagedSnapshots.stream()
383-
.sorted(java.util.Comparator.comparingLong(Snapshot::timestampMillis))
383+
.sorted(java.util.Comparator.comparingLong(Snapshot::sequenceNumber))
384384
.forEach(metadataBuilder::addSnapshot);
385385

386-
// Add new main branch snapshots in timestamp order (explicit ordering)
387-
// Note: While the branch pointer (not list order) determines currentSnapshot(),
388-
// other code assumes snapshots are time-ordered (e.g., validation at line 308)
389-
this.newMainBranchSnapshots.stream()
390-
.sorted(java.util.Comparator.comparingLong(Snapshot::timestampMillis))
391-
.forEach(metadataBuilder::addSnapshot);
386+
// Add new main branch snapshots in sequence number order (ensures correct commit ordering)
387+
List<Snapshot> sortedMainBranchSnapshots =
388+
this.newMainBranchSnapshots.stream()
389+
.sorted(java.util.Comparator.comparingLong(Snapshot::sequenceNumber))
390+
.collect(Collectors.toList());
391+
sortedMainBranchSnapshots.forEach(metadataBuilder::addSnapshot);
392392

393393
// Set branch pointer once using providedRefs (covers both new snapshots and cherry-pick)
394394
if (!this.providedRefs.isEmpty()) {
395395
long newSnapshotId = this.providedRefs.get(SnapshotRef.MAIN_BRANCH).snapshotId();
396396
metadataBuilder.setBranchSnapshot(newSnapshotId, SnapshotRef.MAIN_BRANCH);
397+
} else if (!sortedMainBranchSnapshots.isEmpty()) {
398+
// Auto-append to main: if no refs provided but there are new main branch snapshots,
399+
// set MAIN to the last snapshot (latest by sequence number due to sort above)
400+
Snapshot latestSnapshot =
401+
sortedMainBranchSnapshots.get(sortedMainBranchSnapshots.size() - 1);
402+
metadataBuilder.setBranchSnapshot(latestSnapshot.snapshotId(), SnapshotRef.MAIN_BRANCH);
397403
}
398404

399405
// Delete snapshots

iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/SnapshotDiffApplierTest.java

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,4 +594,253 @@ void testApplySnapshots_deletingAllSnapshotsWithNoReplacement_throwsException()
594594

595595
assertTrue(exception.getMessage().contains("Cannot delete the current snapshot"));
596596
}
597+
598+
/**
599+
* Verifies transition from table with unreferenced snapshots to having a MAIN branch. Tests
600+
* ref-only update without snapshot changes.
601+
*/
602+
@Test
603+
void testApplySnapshots_baseWithUnreferencedSnapshotsOnly_addFirstMainBranch()
604+
throws IOException {
605+
List<Snapshot> snapshots = IcebergTestUtil.getSnapshots();
606+
607+
// Create base with snapshots but no refs (all unreferenced)
608+
TableMetadata base = baseMetadata;
609+
for (Snapshot snapshot : snapshots) {
610+
base = TableMetadata.buildFrom(base).addSnapshot(snapshot).build();
611+
}
612+
// Verify no refs in base
613+
assertTrue(base.refs().isEmpty() || !base.refs().containsKey(SnapshotRef.MAIN_BRANCH));
614+
615+
// Provided: same snapshots + MAIN ref to one of them
616+
Snapshot mainSnapshot = snapshots.get(2);
617+
Map<String, String> refs = IcebergTestUtil.obtainSnapshotRefsFromSnapshot(mainSnapshot);
618+
TableMetadata newMetadata = createMetadataWithSnapshots(base, snapshots, refs);
619+
620+
TableMetadata result = snapshotDiffApplier.applySnapshots(base, newMetadata);
621+
622+
// Verify MAIN ref is set
623+
assertNotNull(result.currentSnapshot());
624+
assertEquals(mainSnapshot.snapshotId(), result.currentSnapshot().snapshotId());
625+
626+
// Verify no add/delete operations (ref-only update)
627+
assertEquals(snapshots.size(), result.snapshots().size());
628+
Map<String, String> resultProps = result.properties();
629+
assertNull(resultProps.get(getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS)));
630+
assertNull(resultProps.get(getCanonicalFieldName(CatalogConstants.DELETED_SNAPSHOTS)));
631+
}
632+
633+
/**
634+
* Verifies table creation with no snapshots (empty state). Tests that an empty table can be
635+
* created successfully.
636+
*/
637+
@Test
638+
void testApplySnapshots_nullBaseEmptySnapshotsEmptyRefs_createsEmptyTable() {
639+
// Provided: empty snapshots list, empty refs
640+
TableMetadata newMetadata =
641+
createMetadataWithSnapshots(baseMetadata, Collections.emptyList(), new HashMap<>());
642+
643+
TableMetadata result = snapshotDiffApplier.applySnapshots(null, newMetadata);
644+
645+
// Verify empty table created
646+
assertNotNull(result);
647+
assertEquals(0, result.snapshots().size());
648+
assertNull(result.currentSnapshot());
649+
assertTrue(result.refs().isEmpty() || !result.refs().containsKey(SnapshotRef.MAIN_BRANCH));
650+
651+
// Verify no snapshot operations tracked
652+
Map<String, String> resultProps = result.properties();
653+
assertNull(resultProps.get(getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS)));
654+
assertNull(resultProps.get(getCanonicalFieldName(CatalogConstants.STAGED_SNAPSHOTS)));
655+
assertNull(resultProps.get(getCanonicalFieldName(CatalogConstants.DELETED_SNAPSHOTS)));
656+
}
657+
658+
/**
659+
* Verifies adding both regular and staged snapshots in a single commit. Tests that snapshot
660+
* categorization correctly handles mixed types.
661+
*/
662+
@Test
663+
void testApplySnapshots_addRegularAndStagedSimultaneously() throws IOException {
664+
// Start from empty base (no existing snapshots)
665+
// Simulate a commit that adds both regular and staged snapshots simultaneously
666+
667+
List<Snapshot> extraSnapshots = IcebergTestUtil.getExtraSnapshots();
668+
669+
// Create a custom WAP snapshot without hardcoded sequence number to avoid conflicts
670+
// Build snapshot JSON manually and wrap it in a Gson array
671+
String wapSnapshotJson =
672+
String.format(
673+
"{\"snapshot-id\":%d,\"timestamp-ms\":%d,\"summary\":%s,\"manifest-list\":\"%s\",\"schema-id\":%d}",
674+
999940701710231339L,
675+
1669126937912L,
676+
new Gson()
677+
.toJson(
678+
Map.of(
679+
"operation", "append",
680+
"wap.id", "test-wap",
681+
"spark.app.id", "local-1669126906634",
682+
"added-data-files", "1",
683+
"added-records", "1")),
684+
"/data/test.avro",
685+
0);
686+
String wapSnapshotArrayJson = new Gson().toJson(List.of(wapSnapshotJson));
687+
List<Snapshot> customWapSnapshots = SnapshotsUtil.parseSnapshots(null, wapSnapshotArrayJson);
688+
689+
List<Snapshot> allSnapshots = new ArrayList<>();
690+
allSnapshots.add(extraSnapshots.get(0)); // New regular snapshot
691+
allSnapshots.add(customWapSnapshots.get(0)); // New staged snapshot
692+
693+
// MAIN ref points to the new regular snapshot
694+
Map<String, String> refs =
695+
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(extraSnapshots.get(0));
696+
TableMetadata newMetadata = createMetadataWithSnapshots(baseMetadata, allSnapshots, refs);
697+
698+
TableMetadata result = snapshotDiffApplier.applySnapshots(null, newMetadata);
699+
700+
// Verify both snapshots added
701+
assertEquals(2, result.snapshots().size());
702+
703+
// Verify regular snapshot is on MAIN
704+
assertNotNull(result.currentSnapshot());
705+
assertEquals(extraSnapshots.get(0).snapshotId(), result.currentSnapshot().snapshotId());
706+
707+
// Verify tracking: regular appended, staged tracked separately
708+
Map<String, String> resultProps = result.properties();
709+
String appendedSnapshotsStr =
710+
resultProps.get(getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS));
711+
String stagedSnapshotsStr =
712+
resultProps.get(getCanonicalFieldName(CatalogConstants.STAGED_SNAPSHOTS));
713+
714+
assertNotNull(appendedSnapshotsStr);
715+
assertTrue(appendedSnapshotsStr.contains(Long.toString(extraSnapshots.get(0).snapshotId())));
716+
717+
assertNotNull(stagedSnapshotsStr);
718+
assertTrue(stagedSnapshotsStr.contains(Long.toString(customWapSnapshots.get(0).snapshotId())));
719+
}
720+
721+
/**
722+
* Verifies cherry-picking a staged snapshot while adding a new snapshot in the same commit. Tests
723+
* compound operation tracking.
724+
*/
725+
@Test
726+
void testApplySnapshots_cherryPickAndAddNewSimultaneously() throws IOException {
727+
List<Snapshot> testWapSnapshots = IcebergTestUtil.getWapSnapshots();
728+
729+
// Base: MAIN snapshot + staged snapshot
730+
TableMetadata base =
731+
TableMetadata.buildFrom(baseMetadata)
732+
.setBranchSnapshot(testWapSnapshots.get(0), SnapshotRef.MAIN_BRANCH)
733+
.addSnapshot(testWapSnapshots.get(1)) // Staged snapshot
734+
.build();
735+
736+
// Provided: existing + new snapshot becomes MAIN, staged is cherry-picked
737+
List<Snapshot> allSnapshots = new ArrayList<>();
738+
allSnapshots.add(testWapSnapshots.get(0));
739+
allSnapshots.add(testWapSnapshots.get(1)); // Was staged, now cherry-picked
740+
allSnapshots.add(testWapSnapshots.get(2)); // New snapshot
741+
742+
// MAIN ref points to new snapshot
743+
Map<String, String> refs =
744+
IcebergTestUtil.obtainSnapshotRefsFromSnapshot(testWapSnapshots.get(2));
745+
TableMetadata newMetadata = createMetadataWithSnapshots(base, allSnapshots, refs);
746+
747+
TableMetadata result = snapshotDiffApplier.applySnapshots(base, newMetadata);
748+
749+
// Verify new snapshot is on MAIN
750+
assertNotNull(result.currentSnapshot());
751+
assertEquals(testWapSnapshots.get(2).snapshotId(), result.currentSnapshot().snapshotId());
752+
753+
// Verify both operations tracked
754+
Map<String, String> resultProps = result.properties();
755+
String appendedSnapshotsStr =
756+
resultProps.get(getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS));
757+
String cherryPickedSnapshotsStr =
758+
resultProps.get(getCanonicalFieldName(CatalogConstants.CHERRY_PICKED_SNAPSHOTS));
759+
760+
// New snapshot should be appended
761+
assertNotNull(appendedSnapshotsStr);
762+
assertTrue(appendedSnapshotsStr.contains(Long.toString(testWapSnapshots.get(2).snapshotId())));
763+
764+
// Staged snapshot should be cherry-picked
765+
assertNotNull(cherryPickedSnapshotsStr);
766+
assertTrue(
767+
cherryPickedSnapshotsStr.contains(Long.toString(testWapSnapshots.get(1).snapshotId())));
768+
}
769+
770+
/**
771+
* Verifies that attempting to delete the current snapshot while unreferenced snapshots exist
772+
* throws an exception. Tests current snapshot protection.
773+
*/
774+
@Test
775+
void testApplySnapshots_attemptDeleteCurrentWithUnreferencedPresent_throwsException()
776+
throws IOException {
777+
List<Snapshot> snapshots = IcebergTestUtil.getSnapshots();
778+
779+
// Base: MAIN snapshot + 2 unreferenced snapshots
780+
TableMetadata base =
781+
TableMetadata.buildFrom(baseMetadata)
782+
.addSnapshot(snapshots.get(0)) // Unreferenced
783+
.addSnapshot(snapshots.get(1)) // Unreferenced
784+
.setBranchSnapshot(snapshots.get(2), SnapshotRef.MAIN_BRANCH) // Current snapshot
785+
.build();
786+
787+
// Provided: only the 2 unreferenced (delete MAIN), no new snapshots
788+
List<Snapshot> remainingSnapshots = snapshots.subList(0, 2);
789+
TableMetadata newMetadata =
790+
createMetadataWithSnapshots(base, remainingSnapshots, new HashMap<>());
791+
792+
// Should throw exception because current snapshot is being deleted without replacement
793+
InvalidIcebergSnapshotException exception =
794+
assertThrows(
795+
InvalidIcebergSnapshotException.class,
796+
() -> snapshotDiffApplier.applySnapshots(base, newMetadata));
797+
798+
assertTrue(exception.getMessage().contains("Cannot delete the current snapshot"));
799+
assertTrue(exception.getMessage().contains(Long.toString(snapshots.get(2).snapshotId())));
800+
}
801+
802+
/**
803+
* Verifies adding regular (non-WAP) snapshots with empty refs. historically, such snapshots were
804+
* automatically added to MAIN branch and tracked as APPENDED_SNAPSHOTS. This test validates
805+
* backward compatibility with that behavior. NOTE: The semantics here are questionable -
806+
* snapshots with no refs should arguably not be "appended" to MAIN, but this preserves the
807+
* original behavior.
808+
*/
809+
@Test
810+
void testApplySnapshots_regularSnapshotsWithEmptyRefs_autoAppendedToMain() throws IOException {
811+
List<Snapshot> baseSnapshots = IcebergTestUtil.getSnapshots();
812+
TableMetadata baseWithSnapshots = addSnapshotsToMetadata(baseMetadata, baseSnapshots);
813+
814+
// Provided: existing + new snapshots, but empty refs map (no MAIN branch)
815+
List<Snapshot> extraSnapshots = IcebergTestUtil.getExtraSnapshots();
816+
List<Snapshot> allSnapshots = new ArrayList<>(baseSnapshots);
817+
allSnapshots.addAll(extraSnapshots);
818+
819+
// Empty refs - no MAIN branch
820+
TableMetadata newMetadata =
821+
createMetadataWithSnapshots(baseWithSnapshots, allSnapshots, new HashMap<>());
822+
823+
TableMetadata result = snapshotDiffApplier.applySnapshots(baseWithSnapshots, newMetadata);
824+
825+
// Verify new snapshots added
826+
assertEquals(allSnapshots.size(), result.snapshots().size());
827+
828+
// Verify MAIN branch points to the latest snapshot (auto-appended to main)
829+
assertNotNull(result.ref(SnapshotRef.MAIN_BRANCH));
830+
assertEquals(
831+
allSnapshots.get(allSnapshots.size() - 1).snapshotId(),
832+
result.ref(SnapshotRef.MAIN_BRANCH).snapshotId());
833+
834+
// Verify new snapshots tracked as appended (even though unreferenced, they're not staged WAP)
835+
Map<String, String> resultProps = result.properties();
836+
String appendedSnapshotsStr =
837+
resultProps.get(getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS));
838+
839+
assertNotNull(appendedSnapshotsStr);
840+
for (Snapshot extraSnapshot : extraSnapshots) {
841+
assertTrue(
842+
appendedSnapshotsStr.contains(Long.toString(extraSnapshot.snapshotId())),
843+
"Snapshot " + extraSnapshot.snapshotId() + " should be tracked as appended");
844+
}
845+
}
597846
}

0 commit comments

Comments
 (0)