diff --git a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalCatalog.java b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalCatalog.java index f77f7a0cb..2743c6579 100644 --- a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalCatalog.java +++ b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalCatalog.java @@ -59,22 +59,24 @@ public class OpenHouseInternalCatalog extends BaseMetastoreCatalog { @Autowired StorageType storageType; - @Autowired SnapshotInspector snapshotInspector; - @Autowired HouseTableMapper houseTableMapper; @Autowired MeterRegistry meterRegistry; @Override protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + FileIO fileIO = resolveFileIO(tableIdentifier); + MetricsReporter metricsReporter = + new MetricsReporter(this.meterRegistry, METRICS_PREFIX, Lists.newArrayList()); + SnapshotDiffApplier snapshotDiffApplier = new SnapshotDiffApplier(metricsReporter); return new OpenHouseInternalTableOperations( houseTableRepository, - resolveFileIO(tableIdentifier), - snapshotInspector, + fileIO, houseTableMapper, tableIdentifier, - new MetricsReporter(this.meterRegistry, METRICS_PREFIX, Lists.newArrayList()), - fileIOManager); + metricsReporter, + fileIOManager, + snapshotDiffApplier); } @Override diff --git a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java index d9fa34257..d96d9d6b1 100644 --- a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java +++ b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java @@ -23,28 +23,19 @@ import java.io.IOException; import java.time.Clock; import java.time.Instant; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.SnapshotRef; -import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SortDirection; import org.apache.iceberg.SortField; import org.apache.iceberg.SortOrder; @@ -60,7 +51,6 @@ import org.apache.iceberg.expressions.Term; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Objects; -import org.springframework.data.util.Pair; @AllArgsConstructor @Slf4j @@ -70,8 +60,6 @@ public class OpenHouseInternalTableOperations extends BaseMetastoreTableOperatio FileIO fileIO; - SnapshotInspector snapshotInspector; - HouseTableMapper houseTableMapper; TableIdentifier tableIdentifier; @@ -80,6 +68,8 @@ public class OpenHouseInternalTableOperations extends BaseMetastoreTableOperatio FileIOManager fileIOManager; + SnapshotDiffApplier snapshotDiffApplier; + private static final Gson GSON = new Gson(); private static final Cache CACHE = @@ -229,6 +219,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { metadata = rebuildTblMetaWithSchema(metadata, CatalogConstants.EVOLVED_SCHEMA_KEY, true); } + metadata = snapshotDiffApplier.applySnapshots(base, metadata); + int version = currentVersion() + 1; CommitStatus commitStatus = CommitStatus.FAILURE; @@ -260,8 +252,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { if (properties.containsKey(CatalogConstants.EVOLVED_SCHEMA_KEY)) { properties.remove(CatalogConstants.EVOLVED_SCHEMA_KEY); } - String serializedSnapshotsToPut = properties.remove(CatalogConstants.SNAPSHOTS_JSON_KEY); - String serializedSnapshotRefs = properties.remove(CatalogConstants.SNAPSHOTS_REFS_KEY); boolean isStageCreate = Boolean.parseBoolean(properties.remove(CatalogConstants.IS_STAGE_CREATE_KEY)); String sortOrderJson = properties.remove(CatalogConstants.SORT_ORDER_KEY); @@ -274,24 +264,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { updatedMetadata = updatedMetadata.replaceSortOrder(sortOrder); } - if (serializedSnapshotsToPut != null) { - List snapshotsToPut = - SnapshotsUtil.parseSnapshots(fileIO, serializedSnapshotsToPut); - Pair, List> snapshotsDiff = - SnapshotsUtil.symmetricDifferenceSplit(snapshotsToPut, updatedMetadata.snapshots()); - List appendedSnapshots = snapshotsDiff.getFirst(); - List deletedSnapshots = snapshotsDiff.getSecond(); - snapshotInspector.validateSnapshotsUpdate( - updatedMetadata, appendedSnapshots, deletedSnapshots); - Map snapshotRefs = - serializedSnapshotRefs == null - ? new HashMap<>() - : SnapshotsUtil.parseSnapshotRefs(serializedSnapshotRefs); - updatedMetadata = - maybeAppendSnapshots(updatedMetadata, appendedSnapshots, snapshotRefs, true); - updatedMetadata = maybeDeleteSnapshots(updatedMetadata, deletedSnapshots); - } - final TableMetadata updatedMtDataRef = updatedMetadata; long metadataUpdateStartTime = System.currentTimeMillis(); try { @@ -531,125 +503,6 @@ private void failIfRetryUpdate(Map properties) { } } - public TableMetadata maybeDeleteSnapshots( - TableMetadata metadata, List snapshotsToDelete) { - TableMetadata result = metadata; - if (CollectionUtils.isNotEmpty(snapshotsToDelete)) { - Set snapshotIds = - snapshotsToDelete.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); - Map updatedProperties = new HashMap<>(result.properties()); - updatedProperties.put( - getCanonicalFieldName(CatalogConstants.DELETED_SNAPSHOTS), - snapshotsToDelete.stream() - .map(s -> Long.toString(s.snapshotId())) - .collect(Collectors.joining(","))); - result = - TableMetadata.buildFrom(result) - .setProperties(updatedProperties) - .build() - .removeSnapshotsIf(s -> snapshotIds.contains(s.snapshotId())); - metricsReporter.count( - InternalCatalogMetricsConstant.SNAPSHOTS_DELETED_CTR, snapshotsToDelete.size()); - } - return result; - } - - public TableMetadata maybeAppendSnapshots( - TableMetadata metadata, - List snapshotsToAppend, - Map snapshotRefs, - boolean recordAction) { - TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(metadata); - List appendedSnapshots = new ArrayList<>(); - List stagedSnapshots = new ArrayList<>(); - List cherryPickedSnapshots = new ArrayList<>(); - // Throw an exception if client sent request that included non-main branches in the - // snapshotRefs. - for (Map.Entry entry : snapshotRefs.entrySet()) { - if (!entry.getKey().equals(SnapshotRef.MAIN_BRANCH)) { - throw new UnsupportedOperationException("OpenHouse supports only MAIN branch"); - } - } - /** - * First check if there are new snapshots to be appended to current TableMetadata. If yes, - * following are the cases to be handled: - * - *

[1] A regular (non-wap) snapshot is being added to the MAIN branch. - * - *

[2] A staged (wap) snapshot is being created on top of current snapshot as its base. - * Recognized by STAGED_WAP_ID_PROP. - * - *

[3] A staged (wap) snapshot is being cherry picked to the MAIN branch wherein current - * snapshot in the MAIN branch is not the same as the base snapshot the staged (wap) snapshot - * was created on. Recognized by SOURCE_SNAPSHOT_ID_PROP. This case is called non-fast forward - * cherry pick. - * - *

In case no new snapshots are to be appended to current TableMetadata, there could be a - * cherrypick of a staged (wap) snapshot on top of the current snapshot in the MAIN branch which - * is the same as the base snapshot the staged (wap) snapshot was created on. This case is - * called fast forward cherry pick. - */ - if (CollectionUtils.isNotEmpty(snapshotsToAppend)) { - for (Snapshot snapshot : snapshotsToAppend) { - snapshotInspector.validateSnapshot(snapshot); - if (snapshot.summary().containsKey(SnapshotSummary.STAGED_WAP_ID_PROP)) { - // a stage only snapshot using wap.id - metadataBuilder.addSnapshot(snapshot); - stagedSnapshots.add(String.valueOf(snapshot.snapshotId())); - } else if (snapshot.summary().containsKey(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP)) { - // a snapshot created on a non fast-forward cherry-pick snapshot - metadataBuilder.setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH); - appendedSnapshots.add(String.valueOf(snapshot.snapshotId())); - cherryPickedSnapshots.add( - String.valueOf(snapshot.summary().get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP))); - } else { - // a regular snapshot - metadataBuilder.setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH); - appendedSnapshots.add(String.valueOf(snapshot.snapshotId())); - } - } - } else if (MapUtils.isNotEmpty(snapshotRefs)) { - // Updated ref in the main branch with no new snapshot means this is a - // fast-forward cherry-pick or rollback operation. - long newSnapshotId = snapshotRefs.get(SnapshotRef.MAIN_BRANCH).snapshotId(); - // Either the current snapshot is null or the current snapshot is not equal - // to the new snapshot indicates an update. The first case happens when the - // stage/wap snapshot being cherry-picked is the first snapshot. - if (MapUtils.isEmpty(metadata.refs()) - || metadata.refs().get(SnapshotRef.MAIN_BRANCH).snapshotId() != newSnapshotId) { - metadataBuilder.setBranchSnapshot(newSnapshotId, SnapshotRef.MAIN_BRANCH); - cherryPickedSnapshots.add(String.valueOf(newSnapshotId)); - } - } - if (recordAction) { - Map updatedProperties = new HashMap<>(metadata.properties()); - if (CollectionUtils.isNotEmpty(appendedSnapshots)) { - updatedProperties.put( - getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS), - appendedSnapshots.stream().collect(Collectors.joining(","))); - metricsReporter.count( - InternalCatalogMetricsConstant.SNAPSHOTS_ADDED_CTR, appendedSnapshots.size()); - } - if (CollectionUtils.isNotEmpty(stagedSnapshots)) { - updatedProperties.put( - getCanonicalFieldName(CatalogConstants.STAGED_SNAPSHOTS), - stagedSnapshots.stream().collect(Collectors.joining(","))); - metricsReporter.count( - InternalCatalogMetricsConstant.SNAPSHOTS_STAGED_CTR, stagedSnapshots.size()); - } - if (CollectionUtils.isNotEmpty(cherryPickedSnapshots)) { - updatedProperties.put( - getCanonicalFieldName(CatalogConstants.CHERRY_PICKED_SNAPSHOTS), - cherryPickedSnapshots.stream().collect(Collectors.joining(","))); - metricsReporter.count( - InternalCatalogMetricsConstant.SNAPSHOTS_CHERRY_PICKED_CTR, - cherryPickedSnapshots.size()); - } - metadataBuilder.setProperties(updatedProperties); - } - return metadataBuilder.build(); - } - /** Helper function to dump contents for map in debugging mode. */ private void logPropertiesMap(Map map) { log.debug(" === Printing the table properties within doCommit method === "); diff --git a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/SnapshotDiffApplier.java b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/SnapshotDiffApplier.java new file mode 100644 index 000000000..345f811ef --- /dev/null +++ b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/SnapshotDiffApplier.java @@ -0,0 +1,495 @@ +package com.linkedin.openhouse.internal.catalog; + +import static com.linkedin.openhouse.internal.catalog.mapper.HouseTableSerdeUtils.getCanonicalFieldName; + +import com.linkedin.openhouse.cluster.metrics.micrometer.MetricsReporter; +import com.linkedin.openhouse.internal.catalog.exception.InvalidIcebergSnapshotException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.TableMetadata; + +/** + * Service responsible for applying snapshot changes to Iceberg table metadata. + * + *

The main entry point applySnapshots() has a clear flow: parse input → compute diff → validate + * → apply. + */ +@AllArgsConstructor +@Slf4j +public class SnapshotDiffApplier { + + private final MetricsReporter metricsReporter; + + /** + * Applies snapshot updates from metadata properties. Simple and clear: parse input, compute diff, + * validate, apply, record metrics, build. + * + * @param existingMetadata The existing table metadata (may be null for table creation) + * @param providedMetadata The new metadata with properties containing snapshot updates + * @return Updated metadata with snapshots applied + * @throws NullPointerException if providedMetadata is null + */ + public TableMetadata applySnapshots( + TableMetadata existingMetadata, TableMetadata providedMetadata) { + // Validate at system boundary + Objects.requireNonNull(providedMetadata, "providedMetadata cannot be null"); + + String snapshotsJson = providedMetadata.properties().get(CatalogConstants.SNAPSHOTS_JSON_KEY); + Map providedRefs = + Optional.ofNullable(providedMetadata.properties().get(CatalogConstants.SNAPSHOTS_REFS_KEY)) + .map(SnapshotsUtil::parseSnapshotRefs) + .orElse(new HashMap<>()); + + // Validate MAIN-only restriction early (PR1 limitation) + for (Map.Entry entry : providedRefs.entrySet()) { + if (!entry.getKey().equals(SnapshotRef.MAIN_BRANCH)) { + throw new UnsupportedOperationException("OpenHouse supports only MAIN branch"); + } + } + + if (snapshotsJson == null) { + return providedMetadata; + } + + // Parse input + List providedSnapshots = SnapshotsUtil.parseSnapshots(null, snapshotsJson); + + List existingSnapshots = + existingMetadata != null ? existingMetadata.snapshots() : Collections.emptyList(); + Map existingRefs = + existingMetadata != null ? existingMetadata.refs() : Collections.emptyMap(); + + // Compute diff (all maps created once in factory method) + SnapshotDiff diff = + SnapshotDiff.create( + metricsReporter, + existingMetadata, + providedMetadata, + existingSnapshots, + providedSnapshots, + existingRefs, + providedRefs); + + // Validate, apply, record metrics (in correct order) + diff.validate(); + TableMetadata result = diff.apply(); + diff.recordMetrics(); + return result; + } + + /** + * State object that computes and caches all snapshot analysis. Computes all maps once in the + * factory method to avoid redundant operations. Provides clear methods for validation and + * application. + */ + private static class SnapshotDiff { + // Injected dependency + private final MetricsReporter metricsReporter; + + // Input state + private final TableMetadata existingMetadata; + private final TableMetadata providedMetadata; + private final String databaseId; + private final List existingSnapshots; + private final List providedSnapshots; + private final Map existingRefs; + private final Map providedRefs; + + // Computed maps (created once) + private final Map providedSnapshotByIds; + private final Map existingSnapshotByIds; + private final Set existingBranchRefIds; + private final Set providedBranchRefIds; + private final List newSnapshots; + private final List deletedSnapshots; + private final Set deletedIds; + + // Categorized snapshots + private final List newStagedSnapshots; + private final List newMainBranchSnapshots; + private final List cherryPickedSnapshots; + private final int appendedCount; + + /** + * Creates a SnapshotDiff by computing all snapshot analysis from the provided inputs. + * + *

Preconditions: All parameters except existingMetadata must be non-null. Collections should + * be empty rather than null. + * + * @param metricsReporter Metrics reporter for recording snapshot operations + * @param existingMetadata The existing table metadata (may be null for table creation) + * @param providedMetadata The new metadata with properties containing snapshot updates + * @param existingSnapshots Snapshots currently in the table + * @param providedSnapshots Snapshots provided in the update + * @param existingRefs Snapshot refs currently in the table + * @param providedRefs Snapshot refs provided in the update + * @return A new SnapshotDiff with all analysis computed + */ + static SnapshotDiff create( + MetricsReporter metricsReporter, + TableMetadata existingMetadata, + TableMetadata providedMetadata, + List existingSnapshots, + List providedSnapshots, + Map existingRefs, + Map providedRefs) { + + // Compute all index maps once + Map providedSnapshotByIds = + providedSnapshots.stream().collect(Collectors.toMap(Snapshot::snapshotId, s -> s)); + Map existingSnapshotByIds = + existingSnapshots.stream().collect(Collectors.toMap(Snapshot::snapshotId, s -> s)); + Set existingBranchRefIds = + existingRefs.values().stream().map(SnapshotRef::snapshotId).collect(Collectors.toSet()); + Set providedBranchRefIds = + providedRefs.values().stream().map(SnapshotRef::snapshotId).collect(Collectors.toSet()); + + // Compute changes + List newSnapshots = + providedSnapshots.stream() + .filter(s -> !existingSnapshotByIds.containsKey(s.snapshotId())) + .collect(Collectors.toList()); + List deletedSnapshots = + existingSnapshots.stream() + .filter(s -> !providedSnapshotByIds.containsKey(s.snapshotId())) + .collect(Collectors.toList()); + Set deletedIds = + deletedSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + + // Categorize snapshots + List newStagedSnapshots = + newSnapshots.stream() + .filter( + s -> + s.summary() != null + && s.summary().containsKey(SnapshotSummary.STAGED_WAP_ID_PROP)) + .collect(Collectors.toList()); + + // Compute source IDs for cherry-pick operations + Set cherryPickSourceIds = + providedSnapshots.stream() + .filter( + s -> + s.summary() != null + && s.summary().containsKey(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP)) + .map(s -> Long.parseLong(s.summary().get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP))) + .collect(Collectors.toSet()); + + List cherryPickedSnapshots = + providedSnapshots.stream() + .filter( + provided -> { + // Only consider EXISTING snapshots as cherry-picked + Snapshot existing = existingSnapshotByIds.get(provided.snapshotId()); + if (existing == null) { + return false; + } + + // Is source of cherry-pick + if (cherryPickSourceIds.contains(provided.snapshotId())) { + return true; + } + + // WAP snapshot being published (staged → branch transition) + boolean hasWapId = + provided.summary() != null + && provided.summary().containsKey(SnapshotSummary.STAGED_WAP_ID_PROP); + // TODO: This works for MAIN branch only, but fails in the branch scenario and + // should be revisited in followup PR + // Snapshot exists on branch-A + // Cherry-pick to branch-B + // Would be classified as NOT wasStaged (because it's in existingBranchRefIds) + // Wouldn't be detected as cherry-picked + boolean wasStaged = !existingBranchRefIds.contains(provided.snapshotId()); + boolean isNowOnBranch = providedBranchRefIds.contains(provided.snapshotId()); + return hasWapId && wasStaged && isNowOnBranch; + }) + .collect(Collectors.toList()); + + // New main branch snapshots = all new snapshots that are not staged WAP + // (includes both regular commits and cherry-pick result snapshots) + List newMainBranchSnapshots = + newSnapshots.stream() + .filter( + s -> + s.summary() == null + || !s.summary().containsKey(SnapshotSummary.STAGED_WAP_ID_PROP)) + .collect(Collectors.toList()); + + // Compute appended count + int appendedCount = newMainBranchSnapshots.size(); + + // Extract database ID from metadata properties + String databaseId = + providedMetadata.properties().get(CatalogConstants.OPENHOUSE_DATABASEID_KEY); + + return new SnapshotDiff( + metricsReporter, + existingMetadata, + providedMetadata, + databaseId, + existingSnapshots, + providedSnapshots, + existingRefs, + providedRefs, + providedSnapshotByIds, + existingSnapshotByIds, + existingBranchRefIds, + providedBranchRefIds, + newSnapshots, + deletedSnapshots, + deletedIds, + newStagedSnapshots, + newMainBranchSnapshots, + cherryPickedSnapshots, + appendedCount); + } + + /** Private constructor that accepts all pre-computed values. Use {@link #create} instead. */ + private SnapshotDiff( + MetricsReporter metricsReporter, + TableMetadata existingMetadata, + TableMetadata providedMetadata, + String databaseId, + List existingSnapshots, + List providedSnapshots, + Map existingRefs, + Map providedRefs, + Map providedSnapshotByIds, + Map existingSnapshotByIds, + Set existingBranchRefIds, + Set providedBranchRefIds, + List newSnapshots, + List deletedSnapshots, + Set deletedIds, + List newStagedSnapshots, + List newMainBranchSnapshots, + List cherryPickedSnapshots, + int appendedCount) { + this.metricsReporter = metricsReporter; + this.existingMetadata = existingMetadata; + this.providedMetadata = providedMetadata; + this.databaseId = databaseId; + this.existingSnapshots = existingSnapshots; + this.providedSnapshots = providedSnapshots; + this.existingRefs = existingRefs; + this.providedRefs = providedRefs; + this.providedSnapshotByIds = providedSnapshotByIds; + this.existingSnapshotByIds = existingSnapshotByIds; + this.existingBranchRefIds = existingBranchRefIds; + this.providedBranchRefIds = providedBranchRefIds; + this.newSnapshots = newSnapshots; + this.deletedSnapshots = deletedSnapshots; + this.deletedIds = deletedIds; + this.newStagedSnapshots = newStagedSnapshots; + this.newMainBranchSnapshots = newMainBranchSnapshots; + this.cherryPickedSnapshots = cherryPickedSnapshots; + this.appendedCount = appendedCount; + } + + /** + * Validates all snapshot changes before applying them to table metadata. + * + * @throws InvalidIcebergSnapshotException if any validation check fails + */ + void validate() { + validateCurrentSnapshotNotDeleted(); + validateDeletedSnapshotsNotReferenced(); + } + + /** + * Validates that the current snapshot is not deleted without providing replacement snapshots. + * + * @throws InvalidIcebergSnapshotException if the current snapshot is being deleted without + * replacements + */ + private void validateCurrentSnapshotNotDeleted() { + if (this.existingMetadata == null || this.existingMetadata.currentSnapshot() == null) { + return; + } + if (!this.newSnapshots.isEmpty()) { + return; + } + // TODO -- validate what are the requirements around deleting the latest snapshot on a + // "branch". + long latestSnapshotId = this.existingMetadata.currentSnapshot().snapshotId(); + // Check if the last deleted snapshot is the current one (snapshots are ordered by time) + if (!this.deletedSnapshots.isEmpty() + && this.deletedSnapshots.get(this.deletedSnapshots.size() - 1).snapshotId() + == latestSnapshotId) { + throw new InvalidIcebergSnapshotException( + String.format( + "Cannot delete the current snapshot %s without adding replacement snapshots.", + latestSnapshotId)); + } + } + + /** + * Validates that snapshots being deleted are not still referenced by any branches or tags. This + * prevents data loss and maintains referential integrity by ensuring that all branch and tag + * pointers reference valid snapshots that will continue to exist after the commit. + * + * @throws InvalidIcebergSnapshotException if any deleted snapshot is still referenced by a + * branch or tag + */ + private void validateDeletedSnapshotsNotReferenced() { + Map> referencedIdsToRefs = + providedRefs.entrySet().stream() + .collect( + Collectors.groupingBy( + e -> e.getValue().snapshotId(), + Collectors.mapping(Map.Entry::getKey, Collectors.toList()))); + + List invalidDeleteDetails = + deletedIds.stream() + .filter(referencedIdsToRefs::containsKey) + .map( + id -> + String.format( + "snapshot %s (referenced by: %s)", + id, String.join(", ", referencedIdsToRefs.get(id)))) + .collect(Collectors.toList()); + + if (!invalidDeleteDetails.isEmpty()) { + throw new InvalidIcebergSnapshotException( + String.format( + "Cannot delete snapshots that are still referenced by branches/tags: %s", + String.join("; ", invalidDeleteDetails))); + } + } + + TableMetadata apply() { + TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(this.providedMetadata); + + /** + * Apply categorized snapshots to metadata: + * + *

[1] Staged (WAP) snapshots - added without branch reference + * + *

[2] New main branch snapshots - added and moved to MAIN branch incrementally + * + *

[3] Cherry-picked snapshots - existing snapshots, final branch pointer set below + * + *

We trust the client-provided order rather than sorting. Sequence numbers are + * monotonically increasing along a branch's lineage (following parent pointers) for both + * cherry-pick result snapshots and fast-forward snapshots. Iceberg's setBranchSnapshot() + * validates sequence numbers, so we can rely on its built-in validation. + */ + // Add staged snapshots in client-provided order + this.newStagedSnapshots.forEach(metadataBuilder::addSnapshot); + + // Add new main branch snapshots and move MAIN pointer incrementally + // This works for both: + // - Regular commits: newly created snapshots + // - Cherry-pick results: newly created snapshots with SOURCE_SNAPSHOT_ID_PROP + for (Snapshot snapshot : this.newMainBranchSnapshots) { + metadataBuilder.setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH); + } + + // Set final branch pointer using providedRefs if present + // This handles fast-forward for cherry-pick/WAP publish where we're moving the branch + // to an existing snapshot + SnapshotRef mainBranchRef = this.providedRefs.get(SnapshotRef.MAIN_BRANCH); + if (mainBranchRef != null) { + long newSnapshotId = mainBranchRef.snapshotId(); + metadataBuilder.setBranchSnapshot(newSnapshotId, SnapshotRef.MAIN_BRANCH); + } + + // Delete snapshots + if (!this.deletedSnapshots.isEmpty()) { + Set snapshotIds = + this.deletedSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + metadataBuilder.removeSnapshots(snapshotIds); + } + + // Record snapshot IDs in properties and cleanup input properties + if (this.appendedCount > 0) { + metadataBuilder.setProperties( + Collections.singletonMap( + getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS), + formatSnapshotIds(this.newMainBranchSnapshots))); + } + if (!this.newStagedSnapshots.isEmpty()) { + metadataBuilder.setProperties( + Collections.singletonMap( + getCanonicalFieldName(CatalogConstants.STAGED_SNAPSHOTS), + formatSnapshotIds(this.newStagedSnapshots))); + } + if (!this.cherryPickedSnapshots.isEmpty()) { + metadataBuilder.setProperties( + Collections.singletonMap( + getCanonicalFieldName(CatalogConstants.CHERRY_PICKED_SNAPSHOTS), + formatSnapshotIds(this.cherryPickedSnapshots))); + } + if (!this.deletedSnapshots.isEmpty()) { + metadataBuilder.setProperties( + Collections.singletonMap( + getCanonicalFieldName(CatalogConstants.DELETED_SNAPSHOTS), + formatSnapshotIds(this.deletedSnapshots))); + } + metadataBuilder.removeProperties( + new HashSet<>( + Arrays.asList( + CatalogConstants.SNAPSHOTS_JSON_KEY, CatalogConstants.SNAPSHOTS_REFS_KEY))); + + return metadataBuilder.build(); + } + + void recordMetrics() { + // Record metrics for appended snapshots (includes regular commits and cherry-pick results) + // Note: cherryPickedSnapshots list contains existing source snapshots, not the new results + recordMetricWithDatabaseTag( + InternalCatalogMetricsConstant.SNAPSHOTS_ADDED_CTR, this.appendedCount); + recordMetricWithDatabaseTag( + InternalCatalogMetricsConstant.SNAPSHOTS_STAGED_CTR, this.newStagedSnapshots.size()); + recordMetricWithDatabaseTag( + InternalCatalogMetricsConstant.SNAPSHOTS_CHERRY_PICKED_CTR, + this.cherryPickedSnapshots.size()); + recordMetricWithDatabaseTag( + InternalCatalogMetricsConstant.SNAPSHOTS_DELETED_CTR, this.deletedSnapshots.size()); + } + + /** + * Helper method to record a metric with database tag if count is greater than zero. + * + * @param metricName The name of the metric to record + * @param count The count value to record + */ + private void recordMetricWithDatabaseTag(String metricName, int count) { + if (count > 0) { + // Only add database tag if databaseId is present; otherwise record metric without tag + if (this.databaseId != null) { + this.metricsReporter.count( + metricName, count, InternalCatalogMetricsConstant.DATABASE_TAG, this.databaseId); + } else { + this.metricsReporter.count(metricName, count); + } + } + } + + /** + * Helper method to format a list of snapshots into a comma-separated string of snapshot IDs. + * + * @param snapshots List of snapshots to format + * @return Comma-separated string of snapshot IDs + */ + private static String formatSnapshotIds(List snapshots) { + return snapshots.stream() + .map(s -> Long.toString(s.snapshotId())) + .collect(Collectors.joining(",")); + } + } +} diff --git a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/SnapshotInspector.java b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/SnapshotInspector.java deleted file mode 100644 index dc7dd06c2..000000000 --- a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/SnapshotInspector.java +++ /dev/null @@ -1,96 +0,0 @@ -package com.linkedin.openhouse.internal.catalog; - -import com.linkedin.openhouse.internal.catalog.exception.InvalidIcebergSnapshotException; -import java.io.UncheckedIOException; -import java.util.List; -import java.util.function.Consumer; -import java.util.function.Supplier; -import java.util.stream.StreamSupport; -import org.apache.hadoop.fs.Path; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -/** - * A inspector class providing functionalities that inspect components of {@link Snapshot} provided - * by clients and decide if OpenHouse need to take additional steps to incorporate it or decide - * whether to incorporate at all. - * - *

Instance of this class will be injected into {@link OpenHouseInternalTableOperations} in - * runtime. - */ -@Component -public class SnapshotInspector { - @Autowired private Consumer> fileSecurer; - /** - * TODO: ADD Validation for snapshot: Sequence-number based, schema-id based, see iceberg spec for - * details. Throwing exceptions when failures occurred. - * - * @param providedSnapshot deserialized {@link Snapshot} object that clients provided. - * @throws InvalidIcebergSnapshotException Exception thrown from the process validating the - * snapshot provided by client. - */ - void validateSnapshot(Snapshot providedSnapshot) throws InvalidIcebergSnapshotException { - // TODO: Fill this method. - } - - void validateSnapshotsUpdate( - TableMetadata metadata, List addedSnapshots, List deletedSnapshots) { - if (metadata.currentSnapshot() == null) { - // no need to verify attempt to delete current snapshot if it doesn't exist - // deletedSnapshots is necessarily empty when original snapshots list is empty - return; - } - if (!addedSnapshots.isEmpty()) { - // latest snapshot can be deleted if new snapshots are added. - return; - } - long latestSnapshotId = metadata.currentSnapshot().snapshotId(); - if (!deletedSnapshots.isEmpty() - && deletedSnapshots.get(deletedSnapshots.size() - 1).snapshotId() == latestSnapshotId) { - throw new InvalidIcebergSnapshotException( - String.format("Cannot delete the latest snapshot %s", latestSnapshotId)); - } - } - - /** - * A sister method to {@link #validateSnapshot(Snapshot)} that change the file-level permission to - * be OpenHouse exclusive to avoid unexpected changes from unauthorized parties. Throwing - * exceptions when failures occurred. - * - * @param providedSnapshot deserialized {@link Snapshot} object that clients provided. - * @param fileIO {@link FileIO} object - * @throws UncheckedIOException Exception thrown from the process securing the files associated - * with {@param providedSnapshot}. - */ - @VisibleForTesting - void secureSnapshot(Snapshot providedSnapshot, FileIO fileIO) throws UncheckedIOException { - secureDataFile(providedSnapshot.addedDataFiles(fileIO)); - secureDeleteFile(providedSnapshot.addedDeleteFiles(fileIO)); - secureManifestFile(providedSnapshot.allManifests(fileIO)); - } - - private void secureDataFile(Iterable dataFiles) { - StreamSupport.stream(dataFiles.spliterator(), false) - .map(x -> (Supplier) (() -> new Path(x.path().toString()))) - .forEach(fileSecurer); - } - - private void secureDeleteFile(Iterable deleteFiles) { - StreamSupport.stream(deleteFiles.spliterator(), false) - .map(x -> (Supplier) (() -> new Path(x.path().toString()))) - .forEach(fileSecurer); - } - - private void secureManifestFile(List manifestFiles) throws UncheckedIOException { - manifestFiles.stream() - .map(x -> (Supplier) (() -> new Path(x.path()))) - .forEach(fileSecurer); - } -} diff --git a/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/IcebergTestUtil.java b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/IcebergTestUtil.java index d4fd6efaa..cdedb3e93 100644 --- a/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/IcebergTestUtil.java +++ b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/IcebergTestUtil.java @@ -44,15 +44,14 @@ private static List loadSnapshots(String snapshotFile) throws IOExcept return SnapshotsUtil.parseSnapshots(null, data); } - public static Map obtainSnapshotRefsFromSnapshot(Snapshot snapshot) { + public static Map createMainBranchRefPointingTo(Snapshot snapshot) { Map snapshotRefs = new HashMap<>(); SnapshotRef snapshotRef = SnapshotRef.branchBuilder(snapshot.snapshotId()).build(); snapshotRefs.put(SnapshotRef.MAIN_BRANCH, SnapshotRefParser.toJson(snapshotRef)); return snapshotRefs; } - public static Map obtainSnapshotRefsFromSnapshot( - Snapshot snapshot, String branch) { + public static Map createBranchRefPointingTo(Snapshot snapshot, String branch) { Map snapshotRefs = new HashMap<>(); SnapshotRef snapshotRef = SnapshotRef.branchBuilder(snapshot.snapshotId()).build(); snapshotRefs.put(branch, SnapshotRefParser.toJson(snapshotRef)); diff --git a/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java index f484b60ae..69b4027b9 100644 --- a/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java +++ b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java @@ -8,6 +8,7 @@ import com.linkedin.openhouse.cluster.storage.StorageType; import com.linkedin.openhouse.cluster.storage.local.LocalStorage; import com.linkedin.openhouse.cluster.storage.local.LocalStorageClient; +import com.linkedin.openhouse.internal.catalog.exception.InvalidIcebergSnapshotException; import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager; import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper; import com.linkedin.openhouse.internal.catalog.model.HouseTable; @@ -41,6 +42,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SortDirection; import org.apache.iceberg.SortOrder; import org.apache.iceberg.TableMetadata; @@ -100,32 +102,41 @@ void setup() { Mockito.when(mockHouseTableMapper.toHouseTable(Mockito.any(TableMetadata.class), Mockito.any())) .thenReturn(mockHouseTable); HadoopFileIO fileIO = new HadoopFileIO(new Configuration()); + MetricsReporter metricsReporter = + new MetricsReporter(new SimpleMeterRegistry(), "TEST_CATALOG", Lists.newArrayList()); + SnapshotDiffApplier snapshotDiffApplier = new SnapshotDiffApplier(metricsReporter); openHouseInternalTableOperations = new OpenHouseInternalTableOperations( mockHouseTableRepository, fileIO, - Mockito.mock(SnapshotInspector.class), mockHouseTableMapper, TEST_TABLE_IDENTIFIER, - new MetricsReporter(new SimpleMeterRegistry(), "TEST_CATALOG", Lists.newArrayList()), - fileIOManager); + metricsReporter, + fileIOManager, + snapshotDiffApplier); // Create a separate instance with mock metrics reporter for testing metrics + SnapshotDiffApplier snapshotDiffApplierWithMockMetrics = + new SnapshotDiffApplier(mockMetricsReporter); openHouseInternalTableOperationsWithMockMetrics = new OpenHouseInternalTableOperations( mockHouseTableRepository, fileIO, - Mockito.mock(SnapshotInspector.class), mockHouseTableMapper, TEST_TABLE_IDENTIFIER, mockMetricsReporter, - fileIOManager); + fileIOManager, + snapshotDiffApplierWithMockMetrics); LocalStorage localStorage = mock(LocalStorage.class); when(fileIOManager.getStorage(fileIO)).thenReturn(localStorage); when(localStorage.getType()).thenReturn(StorageType.LOCAL); } + /** + * Tests committing snapshots to a table with no existing snapshots (initial version). Verifies + * that all snapshots are appended and tracked in table properties. + */ @Test void testDoCommitAppendSnapshotsInitialVersion() throws IOException { List testSnapshots = IcebergTestUtil.getSnapshots(); @@ -137,7 +148,7 @@ void testDoCommitAppendSnapshotsInitialVersion() throws IOException { properties.put( CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap( - IcebergTestUtil.obtainSnapshotRefsFromSnapshot( + IcebergTestUtil.createMainBranchRefPointingTo( testSnapshots.get(testSnapshots.size() - 1)))); TableMetadata metadata = BASE_TABLE_METADATA.replaceProperties(properties); @@ -161,6 +172,10 @@ void testDoCommitAppendSnapshotsInitialVersion() throws IOException { } } + /** + * Tests committing additional snapshots to a table that already has existing snapshots. Verifies + * that only new snapshots are appended and tracked appropriately. + */ @Test void testDoCommitAppendSnapshotsExistingVersion() throws IOException { List testSnapshots = IcebergTestUtil.getSnapshots(); @@ -178,7 +193,7 @@ void testDoCommitAppendSnapshotsExistingVersion() throws IOException { properties.put( CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap( - IcebergTestUtil.obtainSnapshotRefsFromSnapshot( + IcebergTestUtil.createMainBranchRefPointingTo( testSnapshots.get(testSnapshots.size() - 1)))); properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION); @@ -190,7 +205,7 @@ void testDoCommitAppendSnapshotsExistingVersion() throws IOException { Assertions.assertEquals( 5, updatedProperties - .size()); /*write.parquet.compression-codec, location, lastModifiedTime, version and deleted_snapshots*/ + .size()); /*write.parquet.compression-codec, location, lastModifiedTime, version and appended_snapshots*/ Assertions.assertEquals( TEST_LOCATION, updatedProperties.get(getCanonicalFieldName("tableVersion"))); @@ -205,6 +220,10 @@ void testDoCommitAppendSnapshotsExistingVersion() throws IOException { } } + /** + * Tests committing changes that both append new snapshots and delete existing ones. Verifies that + * both appended and deleted snapshots are correctly tracked in properties. + */ @Test void testDoCommitAppendAndDeleteSnapshots() throws IOException { List testSnapshots = IcebergTestUtil.getSnapshots(); @@ -229,7 +248,7 @@ void testDoCommitAppendAndDeleteSnapshots() throws IOException { properties.put( CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap( - IcebergTestUtil.obtainSnapshotRefsFromSnapshot( + IcebergTestUtil.createMainBranchRefPointingTo( newSnapshots.get(newSnapshots.size() - 1)))); properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION); @@ -263,6 +282,10 @@ void testDoCommitAppendAndDeleteSnapshots() throws IOException { } } + /** + * Tests that metadata file updates are performed for replicated table initial version commits. + * Verifies that updateMetadataField is called with the correct parameters for replicated tables. + */ @Test void testDoCommitUpdateMetadataForInitalVersionCommit() throws IOException { Map properties = new HashMap<>(); @@ -323,6 +346,10 @@ void testDoCommitUpdateMetadataForInitalVersionCommit() throws IOException { verify(mockLocalStorageClient).getNativeClient(); } + /** + * Tests that metadata file updates are not performed for non-replicated tables. Verifies that + * updateMetadataField is never called when the table is not replicated. + */ @Test void testDoCommitUpdateMetadataNotCalledForNonReplicatedTable() throws IOException { Map properties = new HashMap<>(); @@ -349,6 +376,10 @@ void testDoCommitUpdateMetadataNotCalledForNonReplicatedTable() throws IOExcepti Mockito.verify(mockHouseTableRepository, Mockito.times(1)).save(Mockito.any(HouseTable.class)); } + /** + * Tests that metadata file updates are not performed for non-initial version commits. Verifies + * that updateMetadataField is only called during table creation, not for subsequent updates. + */ @Test void testDoCommitUpdateMetadataNotCalledForNonInitialVersionCommit() throws IOException { Map properties = new HashMap<>(); @@ -382,6 +413,10 @@ void testDoCommitUpdateMetadataNotCalledForNonInitialVersionCommit() throws IOEx Mockito.verify(mockHouseTableRepository, Mockito.times(1)).save(Mockito.any(HouseTable.class)); } + /** + * Tests committing changes that delete some snapshots while keeping others. Verifies that deleted + * snapshots are properly tracked in table properties. + */ @Test void testDoCommitDeleteSnapshots() throws IOException { List testSnapshots = IcebergTestUtil.getSnapshots(); @@ -403,7 +438,7 @@ void testDoCommitDeleteSnapshots() throws IOException { properties.put( CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap( - IcebergTestUtil.obtainSnapshotRefsFromSnapshot( + IcebergTestUtil.createMainBranchRefPointingTo( testSnapshots.get(testSnapshots.size() - 1)))); properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION); @@ -430,6 +465,10 @@ void testDoCommitDeleteSnapshots() throws IOException { } } + /** + * Tests that commits to staged tables do not persist to the repository. Verifies that table + * metadata is set locally but save() and findById() are never called. + */ @Test void testDoCommitDoesntPersistForStagedTable() { TableMetadata metadata = @@ -451,6 +490,106 @@ void testDoCommitDoesntPersistForStagedTable() { .get()); } + /** + * Tests staged table creation with no snapshots (initial version). Verifies that the table + * metadata is set locally but no persistence occurs to the repository. + */ + @Test + void testStagedTableCreationWithoutSnapshots() throws IOException { + Map properties = new HashMap<>(BASE_TABLE_METADATA.properties()); + properties.put(CatalogConstants.IS_STAGE_CREATE_KEY, "true"); + + TableMetadata metadata = BASE_TABLE_METADATA.replaceProperties(properties); + + try (MockedStatic ignoreWriteMock = + Mockito.mockStatic(TableMetadataParser.class, Mockito.CALLS_REAL_METHODS)) { + openHouseInternalTableOperations.doCommit(null, metadata); + + // Verify TableMetadata is set locally + Assertions.assertNotNull(openHouseInternalTableOperations.currentMetadataLocation()); + Assertions.assertNotNull(openHouseInternalTableOperations.current()); + + // Verify no snapshots were added + Assertions.assertEquals(0, openHouseInternalTableOperations.current().snapshots().size()); + + // Verify no persistence to repository + verify(mockHouseTableRepository, times(0)).save(any()); + + // Verify no snapshot properties were set + Map resultProperties = + openHouseInternalTableOperations.current().properties(); + Assertions.assertNull(resultProperties.get(getCanonicalFieldName("appended_snapshots"))); + Assertions.assertNull(resultProperties.get(getCanonicalFieldName("staged_snapshots"))); + Assertions.assertNull(resultProperties.get(getCanonicalFieldName("cherry_picked_snapshots"))); + Assertions.assertNull(resultProperties.get(getCanonicalFieldName("deleted_snapshots"))); + } + } + + /** + * Tests staged table creation with staged (WAP) snapshots. Verifies that staged snapshots are + * added to the table but no persistence occurs to the repository. + */ + @Test + void testStagedTableCreationWithStagedSnapshots() throws IOException { + List testWapSnapshots = IcebergTestUtil.getWapSnapshots().subList(0, 2); + Map properties = new HashMap<>(BASE_TABLE_METADATA.properties()); + properties.put(CatalogConstants.IS_STAGE_CREATE_KEY, "true"); + properties.put( + CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(testWapSnapshots)); + + TableMetadata metadata = BASE_TABLE_METADATA.replaceProperties(properties); + + try (MockedStatic ignoreWriteMock = + Mockito.mockStatic(TableMetadataParser.class, Mockito.CALLS_REAL_METHODS)) { + openHouseInternalTableOperations.doCommit(null, metadata); + + // Verify TableMetadata is set locally + Assertions.assertNotNull(openHouseInternalTableOperations.currentMetadataLocation()); + Assertions.assertNotNull(openHouseInternalTableOperations.current()); + + // Verify staged snapshots were added + TableMetadata currentMetadata = openHouseInternalTableOperations.current(); + Assertions.assertEquals( + testWapSnapshots.size(), + currentMetadata.snapshots().size(), + "Staged snapshots should be added"); + + // Verify all snapshots are staged (have WAP ID) + for (Snapshot snapshot : currentMetadata.snapshots()) { + Assertions.assertTrue( + snapshot.summary().containsKey(SnapshotSummary.STAGED_WAP_ID_PROP), + "All snapshots should be staged with WAP ID"); + } + + // Verify no branch references exist (staged snapshots should not be on main) + Assertions.assertTrue( + currentMetadata.refs().isEmpty() + || !currentMetadata.refs().containsKey(SnapshotRef.MAIN_BRANCH), + "Staged snapshots should not have main branch reference"); + + // Verify no persistence to repository + verify(mockHouseTableRepository, times(0)).save(any()); + + // Verify snapshot properties tracking + Map resultProperties = currentMetadata.properties(); + Assertions.assertEquals( + testWapSnapshots.stream() + .map(s -> Long.toString(s.snapshotId())) + .collect(Collectors.joining(",")), + resultProperties.get(getCanonicalFieldName("staged_snapshots")), + "Staged snapshots should be tracked in properties"); + Assertions.assertNull( + resultProperties.get(getCanonicalFieldName("appended_snapshots")), + "No snapshots should be appended to main"); + Assertions.assertNull(resultProperties.get(getCanonicalFieldName("cherry_picked_snapshots"))); + Assertions.assertNull(resultProperties.get(getCanonicalFieldName("deleted_snapshots"))); + } + } + + /** + * Tests that repository exceptions are properly converted to Iceberg exceptions. Verifies that + * various repository exceptions map to CommitFailedException or CommitStateUnknownException. + */ @Test void testDoCommitExceptionHandling() { TableMetadata base = BASE_TABLE_METADATA; @@ -479,38 +618,61 @@ void testDoCommitExceptionHandling() { () -> openHouseInternalTableOperations.doCommit(base, metadata)); } + /** + * Tests that attempting to delete a snapshot that is still referenced by a branch throws an + * exception. Verifies that InvalidIcebergSnapshotException is thrown when snapshot refs conflict + * with deletions. + */ @Test - void testDoCommitSnapshotsValidationExceptionHandling() throws IOException { + void testDoCommitSnapshotsValidationThrowsException() throws IOException { TableMetadata metadata = BASE_TABLE_METADATA.replaceProperties(ImmutableMap.of("random", "value")); List testSnapshots = IcebergTestUtil.getSnapshots(); Map properties = new HashMap<>(metadata.properties()); + + // The key issue: SNAPSHOTS_JSON_KEY says to keep only snapshot 2, but snapshot 1 is referenced + // by main + // This creates a conflict - we're trying to delete snapshot 1 but it's still referenced properties.put( CatalogConstants.SNAPSHOTS_JSON_KEY, - SnapshotsUtil.serializedSnapshots(testSnapshots.subList(1, 3))); + SnapshotsUtil.serializedSnapshots( + testSnapshots.subList(2, 3))); // Only snapshot 2 should remain properties.put( CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap( - IcebergTestUtil.obtainSnapshotRefsFromSnapshot( - testSnapshots.get(testSnapshots.size() - 1)))); + IcebergTestUtil.createMainBranchRefPointingTo( + testSnapshots.get(1)))); // But main refs snapshot 1 properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION); metadata = metadata.replaceProperties(properties); + + // Create initial metadata with snapshots 1 and 2, where snapshot 1 is referenced by main TableMetadata metadataWithSnapshots = TableMetadata.buildFrom(metadata) - .setBranchSnapshot(testSnapshots.get(0), SnapshotRef.MAIN_BRANCH) - .setBranchSnapshot(testSnapshots.get(1), SnapshotRef.MAIN_BRANCH) + .setBranchSnapshot(testSnapshots.get(1), SnapshotRef.MAIN_BRANCH) // snapshot 1 -> main + .addSnapshot(testSnapshots.get(2)) // snapshot 2 exists but unreferenced initially .build(); + + // Target metadata tries to delete snapshot 1 (not in SNAPSHOTS_JSON_KEY) but main still refs it TableMetadata metadataWithSnapshotsDeleted = TableMetadata.buildFrom(metadata) - .setBranchSnapshot(testSnapshots.get(3), SnapshotRef.MAIN_BRANCH) + .setBranchSnapshot( + testSnapshots.get(1), SnapshotRef.MAIN_BRANCH) // main still points to snapshot 1 .build(); - Assertions.assertDoesNotThrow( + // This should throw exception because snapshot 1 is marked for deletion but still referenced by + // main + Assertions.assertThrows( + InvalidIcebergSnapshotException.class, () -> openHouseInternalTableOperations.doCommit( - metadataWithSnapshots, metadataWithSnapshotsDeleted)); + metadataWithSnapshots, metadataWithSnapshotsDeleted), + "Should throw exception when trying to delete referenced snapshots"); } + /** + * Tests committing WAP (write-audit-publish) staged snapshots to an initial version table. + * Verifies that snapshots are marked as staged but not appended to the main branch. + */ @Test void testDoCommitAppendStageOnlySnapshotsInitialVersion() throws IOException { List testWapSnapshots = IcebergTestUtil.getWapSnapshots().subList(0, 2); @@ -532,16 +694,18 @@ void testDoCommitAppendStageOnlySnapshotsInitialVersion() throws IOException { .map(s -> Long.toString(s.snapshotId())) .collect(Collectors.joining(",")), updatedProperties.get(getCanonicalFieldName("staged_snapshots"))); - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("appended_snapshots"))); - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("cherry_picked_snapshots"))); - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("deleted_snapshots"))); + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("appended_snapshots"))); + Assertions.assertNull( + updatedProperties.get(getCanonicalFieldName("cherry_picked_snapshots"))); + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("deleted_snapshots"))); Mockito.verify(mockHouseTableRepository, Mockito.times(1)).save(Mockito.eq(mockHouseTable)); } } + /** + * Tests committing WAP staged snapshots to a table with existing snapshots. Verifies that new + * snapshots are tracked as staged without being appended to main. + */ @Test void testDoCommitAppendStageOnlySnapshotsExistingVersion() throws IOException { List testSnapshots = IcebergTestUtil.getSnapshots(); @@ -563,7 +727,7 @@ void testDoCommitAppendStageOnlySnapshotsExistingVersion() throws IOException { properties.put( CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap( - IcebergTestUtil.obtainSnapshotRefsFromSnapshot(newSnapshots.get(0)))); + IcebergTestUtil.createMainBranchRefPointingTo(newSnapshots.get(0)))); properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION); TableMetadata metadata = base.replaceProperties(properties); @@ -577,60 +741,66 @@ void testDoCommitAppendStageOnlySnapshotsExistingVersion() throws IOException { .map(s -> Long.toString(s.snapshotId())) .collect(Collectors.joining(",")), updatedProperties.get(getCanonicalFieldName("staged_snapshots"))); - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("appended_snapshots"))); - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("cherry_picked_snapshots"))); - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("deleted_snapshots"))); + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("appended_snapshots"))); + Assertions.assertNull( + updatedProperties.get(getCanonicalFieldName("cherry_picked_snapshots"))); + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("deleted_snapshots"))); Mockito.verify(mockHouseTableRepository, Mockito.times(1)).save(Mockito.eq(mockHouseTable)); } } - @Test - void testDoCommitAppendSnapshotsToNonMainBranch() throws IOException { - List testSnapshots = IcebergTestUtil.getSnapshots(); - Map properties = new HashMap<>(BASE_TABLE_METADATA.properties()); - try (MockedStatic ignoreWriteMock = - Mockito.mockStatic(TableMetadataParser.class)) { - properties.put( - CatalogConstants.SNAPSHOTS_JSON_KEY, - SnapshotsUtil.serializedSnapshots(testSnapshots.subList(0, 1))); - properties.put( - CatalogConstants.SNAPSHOTS_REFS_KEY, - SnapshotsUtil.serializeMap( - IcebergTestUtil.obtainSnapshotRefsFromSnapshot(testSnapshots.get(0), "branch"))); - properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION); - - TableMetadata metadata = BASE_TABLE_METADATA.replaceProperties(properties); - // verify throw an error when committing to non-main branch. - Assertions.assertThrows( - CommitStateUnknownException.class, - () -> openHouseInternalTableOperations.doCommit(BASE_TABLE_METADATA, metadata)); - } - } - + /** + * Tests validation that rejects appending snapshots older than the current metadata timestamp. + * Verifies that IllegalArgumentException is thrown for stale snapshots unless newer ones are + * included. + */ @Test void testAppendSnapshotsWithOldSnapshots() throws IOException { - TableMetadata metadata = + // Create base metadata (existing table state) + TableMetadata baseMetadata = TableMetadata.buildFrom(BASE_TABLE_METADATA) - .setPreviousFileLocation("tmp_location") + .setPreviousFileLocation("tmp_location") // this is key .setLocation(BASE_TABLE_METADATA.metadataFileLocation()) .build(); + // all snapshots are from the past and snapshots add should fail the validation List snapshots = IcebergTestUtil.getSnapshots(); + Map properties = new HashMap<>(baseMetadata.properties()); + properties.put( + CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(snapshots)); + properties.put( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap( + IcebergTestUtil.createMainBranchRefPointingTo(snapshots.get(snapshots.size() - 1)))); + + TableMetadata newMetadata = baseMetadata.replaceProperties(properties); + Assertions.assertThrows( IllegalArgumentException.class, () -> - openHouseInternalTableOperations.maybeAppendSnapshots( - metadata, snapshots, ImmutableMap.of(), false)); + openHouseInternalTableOperations.snapshotDiffApplier.applySnapshots( + baseMetadata, newMetadata)); + // the latest snapshots have larger timestamp than the previous metadata timestamp, so it should // pass the validation snapshots.addAll(IcebergTestUtil.getFutureSnapshots()); - openHouseInternalTableOperations.maybeAppendSnapshots( - metadata, snapshots, ImmutableMap.of(), false); + Map propertiesWithFuture = new HashMap<>(baseMetadata.properties()); + propertiesWithFuture.put( + CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(snapshots)); + propertiesWithFuture.put( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap( + IcebergTestUtil.createMainBranchRefPointingTo(snapshots.get(snapshots.size() - 1)))); + + TableMetadata newMetadataWithFuture = baseMetadata.replaceProperties(propertiesWithFuture); + openHouseInternalTableOperations.snapshotDiffApplier.applySnapshots( + baseMetadata, newMetadataWithFuture); } + /** + * Tests cherry-picking a staged snapshot to main when the base snapshot hasn't changed. Verifies + * that the existing staged snapshot is promoted without creating a new snapshot. + */ @Test void testDoCommitCherryPickSnapshotBaseUnchanged() throws IOException { List testSnapshots = IcebergTestUtil.getSnapshots(); @@ -653,7 +823,7 @@ void testDoCommitCherryPickSnapshotBaseUnchanged() throws IOException { properties.put( CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap( - IcebergTestUtil.obtainSnapshotRefsFromSnapshot(testWapSnapshots.get(0)))); + IcebergTestUtil.createMainBranchRefPointingTo(testWapSnapshots.get(0)))); properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION); TableMetadata metadata = base.replaceProperties(properties); @@ -662,19 +832,20 @@ void testDoCommitCherryPickSnapshotBaseUnchanged() throws IOException { Map updatedProperties = tblMetadataCaptor.getValue().properties(); // verify the staged snapshot is cherry picked by use the existing one - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("staged_snapshots"))); - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("appended_snapshots"))); + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("staged_snapshots"))); + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("appended_snapshots"))); Assertions.assertEquals( Long.toString(testWapSnapshots.get(0).snapshotId()), updatedProperties.get(getCanonicalFieldName("cherry_picked_snapshots"))); - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("deleted_snapshots"))); + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("deleted_snapshots"))); Mockito.verify(mockHouseTableRepository, Mockito.times(1)).save(Mockito.eq(mockHouseTable)); } } + /** + * Tests cherry-picking a staged snapshot when the base has changed since staging. Verifies that a + * new snapshot is created and appended to track the rebased changes. + */ @Test void testDoCommitCherryPickSnapshotBaseChanged() throws IOException { List testWapSnapshots = IcebergTestUtil.getWapSnapshots(); @@ -687,13 +858,13 @@ void testDoCommitCherryPickSnapshotBaseChanged() throws IOException { Map properties = new HashMap<>(base.properties()); try (MockedStatic ignoreWriteMock = Mockito.mockStatic(TableMetadataParser.class)) { - // cherry pick the staged snapshot whose base has changed + // cherry-pick the staged snapshot whose base has changed properties.put( CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(testWapSnapshots)); properties.put( CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap( - IcebergTestUtil.obtainSnapshotRefsFromSnapshot( + IcebergTestUtil.createMainBranchRefPointingTo( testWapSnapshots.get(2)))); // new snapshot properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION); @@ -702,21 +873,23 @@ void testDoCommitCherryPickSnapshotBaseChanged() throws IOException { Mockito.verify(mockHouseTableMapper).toHouseTable(tblMetadataCaptor.capture(), Mockito.any()); Map updatedProperties = tblMetadataCaptor.getValue().properties(); - // verify the staged snapshot is cherry picked by creating a new snapshot and append it - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("staged_snapshots"))); + // verify the staged snapshot is cherry-picked by creating a new snapshot and append it + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("staged_snapshots"))); Assertions.assertEquals( Long.toString(testWapSnapshots.get(2).snapshotId()), updatedProperties.get(getCanonicalFieldName("appended_snapshots"))); Assertions.assertEquals( Long.toString(testWapSnapshots.get(1).snapshotId()), updatedProperties.get(getCanonicalFieldName("cherry_picked_snapshots"))); - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("deleted_snapshots"))); + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("deleted_snapshots"))); Mockito.verify(mockHouseTableRepository, Mockito.times(1)).save(Mockito.eq(mockHouseTable)); } } + /** + * Tests cherry-picking the first staged snapshot (with no parent) to the main branch. Verifies + * that the staged snapshot is promoted directly without creating a new snapshot. + */ @Test void testDoCommitCherryPickFirstSnapshot() throws IOException { List testWapSnapshots = IcebergTestUtil.getWapSnapshots().subList(0, 1); @@ -732,7 +905,7 @@ void testDoCommitCherryPickFirstSnapshot() throws IOException { properties.put( CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap( - IcebergTestUtil.obtainSnapshotRefsFromSnapshot(testWapSnapshots.get(0)))); + IcebergTestUtil.createMainBranchRefPointingTo(testWapSnapshots.get(0)))); properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION); TableMetadata metadata = base.replaceProperties(properties); @@ -741,19 +914,20 @@ void testDoCommitCherryPickFirstSnapshot() throws IOException { Map updatedProperties = tblMetadataCaptor.getValue().properties(); // verify the staged snapshot is cherry picked by using the existing one - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("staged_snapshots"))); - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("appended_snapshots"))); + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("staged_snapshots"))); + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("appended_snapshots"))); Assertions.assertEquals( Long.toString(testWapSnapshots.get(0).snapshotId()), updatedProperties.get(getCanonicalFieldName("cherry_picked_snapshots"))); - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("deleted_snapshots"))); + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("deleted_snapshots"))); Mockito.verify(mockHouseTableRepository, Mockito.times(1)).save(Mockito.eq(mockHouseTable)); } } + /** + * Tests deleting the last staged snapshot when no references point to it. Verifies that no + * snapshot operations are tracked since the snapshot was unreferenced. + */ @Test void testDoCommitDeleteLastStagedSnapshotWhenNoRefs() throws IOException { List testWapSnapshots = IcebergTestUtil.getWapSnapshots().subList(0, 1); @@ -772,18 +946,19 @@ void testDoCommitDeleteLastStagedSnapshotWhenNoRefs() throws IOException { Map updatedProperties = tblMetadataCaptor.getValue().properties(); // verify nothing happens - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("staged_snapshots"))); - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("appended_snapshots"))); - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("cherry_picked_snapshots"))); - Assertions.assertEquals( - null, updatedProperties.get(getCanonicalFieldName("deleted_snapshots"))); + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("staged_snapshots"))); + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("appended_snapshots"))); + Assertions.assertNull( + updatedProperties.get(getCanonicalFieldName("cherry_picked_snapshots"))); + Assertions.assertNull(updatedProperties.get(getCanonicalFieldName("deleted_snapshots"))); Mockito.verify(mockHouseTableRepository, Mockito.times(1)).save(Mockito.eq(mockHouseTable)); } } + /** + * Tests rebuilding an unpartitioned table's partition spec with a new schema. Verifies that the + * rebuilt spec remains unpartitioned. + */ @Test void testRebuildPartitionSpecUnpartitioned() { Schema originalSchema = @@ -798,6 +973,10 @@ void testRebuildPartitionSpecUnpartitioned() { Assertions.assertTrue(rebuiltSpec.isUnpartitioned()); } + /** + * Tests rebuilding partition spec when the new schema has the same field IDs as the original. + * Verifies that partition fields are correctly mapped using matching field IDs. + */ @Test void testRebuildPartitionSpec_NewSchemaSameFieldIds() { Schema originalSchema = @@ -835,6 +1014,11 @@ void testRebuildPartitionSpec_NewSchemaSameFieldIds() { Assertions.assertEquals(3, rebuiltSpec.fields().get(2).sourceId()); } + /** + * Tests rebuilding partition spec when the new schema has different field IDs for same field + * names. Verifies that partition fields are correctly remapped to new field IDs based on field + * names. + */ @Test void testRebuildPartitionSpec_NewSchemaDifferentFieldIds() { Schema originalSchema = @@ -880,6 +1064,10 @@ void testRebuildPartitionSpec_NewSchemaDifferentFieldIds() { Assertions.assertEquals(2, rebuiltSpec.fields().get(2).sourceId()); } + /** + * Tests rebuilding partition spec when a partition field is missing from the new schema. Verifies + * that an IllegalArgumentException is thrown for the missing field. + */ @Test void testRebuildPartitionSpec_fieldMissingInNewSchema() { Schema originalSchema = @@ -901,6 +1089,10 @@ void testRebuildPartitionSpec_fieldMissingInNewSchema() { "Field field1 does not exist in the new schema", exception.getMessage()); } + /** + * Tests rebuilding sort order when the new schema has the same field IDs as the original. + * Verifies that sort fields are correctly mapped using matching field IDs. + */ @Test void testRebuildSortOrder_NewSchemaSameFieldIds() { Schema originalSchema = @@ -927,6 +1119,10 @@ void testRebuildSortOrder_NewSchemaSameFieldIds() { Assertions.assertEquals(2, rebuiltSortOrder.fields().get(1).sourceId()); } + /** + * Tests rebuilding sort order when the new schema has different field IDs for same field names. + * Verifies that sort fields are correctly remapped to new field IDs based on field names. + */ @Test void testRebuildSortOrder_NewSchemaDifferentFieldIds() { Schema originalSchema = @@ -953,6 +1149,10 @@ void testRebuildSortOrder_NewSchemaDifferentFieldIds() { Assertions.assertEquals(1, rebuiltSortOrder.fields().get(1).sourceId()); } + /** + * Tests rebuilding sort order when a sort field is missing from the new schema. Verifies that an + * IllegalArgumentException is thrown for the missing field. + */ @Test void testRebuildSortOrder_fieldMissingInNewSchema() { Schema originalSchema = @@ -971,6 +1171,10 @@ void testRebuildSortOrder_fieldMissingInNewSchema() { "Field field1 does not exist in the new schema", exception.getMessage()); } + /** + * Tests that refresh metadata operations record metrics with database tag but not table tag. + * Verifies that only the database dimension is included to avoid high cardinality. + */ @Test void testRefreshMetadataIncludesDatabaseTag() { testMetricIncludesDatabaseTag( @@ -980,6 +1184,10 @@ void testRefreshMetadataIncludesDatabaseTag() { "Timer should not have table tag (removed because the table tag has super high cardinality and overloads metric emission max size)"); } + /** + * Tests that commit metadata update operations record metrics with database tag but not table + * tag. Verifies that only the database dimension is included to avoid high cardinality. + */ @Test void testCommitMetadataUpdateIncludesDatabaseTag() { testMetricIncludesDatabaseTag( @@ -989,6 +1197,10 @@ void testCommitMetadataUpdateIncludesDatabaseTag() { "Timer should not have table tag (only database dimension should be included)"); } + /** + * Tests that refresh metadata latency timer has histogram buckets configured. Verifies that the + * metrics can be used for histogram-based monitoring and alerting. + */ @Test void testRefreshMetadataLatencyHasHistogramBuckets() { testMetricHasHistogramBuckets( @@ -997,6 +1209,10 @@ void testRefreshMetadataLatencyHasHistogramBuckets() { this::executeRefreshMetadata); } + /** + * Tests that commit metadata update latency timer has histogram buckets configured. Verifies that + * the metrics can be used for histogram-based monitoring and alerting. + */ @Test void testCommitMetadataUpdateLatencyHasHistogramBuckets() { testMetricHasHistogramBuckets( @@ -1023,17 +1239,19 @@ private void testMetricIncludesDatabaseTag( SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); MetricsReporter realMetricsReporter = new MetricsReporter(meterRegistry, "TEST_CATALOG", Lists.newArrayList()); + HadoopFileIO fileIO = new HadoopFileIO(new Configuration()); + SnapshotDiffApplier snapshotDiffApplier = new SnapshotDiffApplier(realMetricsReporter); // Create instance with real metrics reporter OpenHouseInternalTableOperations operationsWithRealMetrics = new OpenHouseInternalTableOperations( mockHouseTableRepository, - new HadoopFileIO(new Configuration()), - Mockito.mock(SnapshotInspector.class), + fileIO, mockHouseTableMapper, TEST_TABLE_IDENTIFIER, realMetricsReporter, - fileIOManager); + fileIOManager, + snapshotDiffApplier); // Setup test-specific mocks setupFunction.accept(operationsWithRealMetrics); @@ -1086,17 +1304,19 @@ private void testMetricHasHistogramBuckets( MetricsReporter realMetricsReporter = new MetricsReporter(meterRegistry, "TEST_CATALOG", Lists.newArrayList()); + HadoopFileIO fileIO = new HadoopFileIO(new Configuration()); + SnapshotDiffApplier snapshotDiffApplier = new SnapshotDiffApplier(realMetricsReporter); // Create instance with real metrics reporter OpenHouseInternalTableOperations operationsWithRealMetrics = new OpenHouseInternalTableOperations( mockHouseTableRepository, - new HadoopFileIO(new Configuration()), - Mockito.mock(SnapshotInspector.class), + fileIO, mockHouseTableMapper, TEST_TABLE_IDENTIFIER, realMetricsReporter, - fileIOManager); + fileIOManager, + snapshotDiffApplier); // Setup test-specific mocks setupFunction.accept(operationsWithRealMetrics); @@ -1225,4 +1445,395 @@ private void verifyMetricHistogramBuckets( Assertions.assertFalse(Double.isNaN(totalTime), "Timer total time should not be NaN"); Assertions.assertFalse(Double.isNaN(maxTime), "Timer max time should not be NaN"); } + + /** + * Tests that unreferenced snapshots can be successfully deleted from the table. Verifies that + * deleted snapshots are removed from metadata and tracked in properties. + */ + @Test + void testDeleteSnapshotWithNoReference() throws IOException { + List testSnapshots = IcebergTestUtil.getSnapshots(); + + // Create base metadata with multiple snapshots + TableMetadata baseMetadata = + TableMetadata.buildFrom(BASE_TABLE_METADATA) + .addSnapshot(testSnapshots.get(0)) // Unreferenced - can be deleted + .addSnapshot(testSnapshots.get(1)) // Unreferenced - can be deleted + .addSnapshot(testSnapshots.get(2)) // Unreferenced - can be deleted + .setBranchSnapshot( + testSnapshots.get(3), SnapshotRef.MAIN_BRANCH) // Referenced - cannot be deleted + .build(); + + // Delete unreferenced snapshots (first two snapshots) + // New metadata keeps snapshots 2 and 3 + Snapshot referencedSnapshot = testSnapshots.get(3); + List remainingSnapshots = testSnapshots.subList(2, 4); + + Map properties = new HashMap<>(baseMetadata.properties()); + properties.put( + CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(remainingSnapshots)); + properties.put( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap( + IcebergTestUtil.createMainBranchRefPointingTo(referencedSnapshot))); + + TableMetadata newMetadata = baseMetadata.replaceProperties(properties); + + TableMetadata result = + openHouseInternalTableOperations.snapshotDiffApplier.applySnapshots( + baseMetadata, newMetadata); + + // Verify unreferenced snapshots were removed + List unreferencedSnapshots = testSnapshots.subList(0, 2); + for (Snapshot unreferenced : unreferencedSnapshots) { + boolean snapshotExists = + result.snapshots().stream().anyMatch(s -> s.snapshotId() == unreferenced.snapshotId()); + Assertions.assertFalse( + snapshotExists, + "Unreferenced snapshot " + unreferenced.snapshotId() + " should be deleted"); + } + + // Verify referenced snapshot still exists + boolean referencedExists = + result.snapshots().stream() + .anyMatch(s -> s.snapshotId() == referencedSnapshot.snapshotId()); + Assertions.assertTrue(referencedExists, "Referenced snapshot should still exist"); + + // Verify deletion tracking + Map resultProperties = result.properties(); + String deletedSnapshotsStr = + resultProperties.get(getCanonicalFieldName(CatalogConstants.DELETED_SNAPSHOTS)); + Assertions.assertNotNull(deletedSnapshotsStr); + + for (Snapshot unreferenced : unreferencedSnapshots) { + Assertions.assertTrue( + deletedSnapshotsStr.contains(Long.toString(unreferenced.snapshotId())), + "Unreferenced snapshot should be tracked as deleted"); + } + } + + /** + * Tests that attempting to delete an empty list of snapshots makes no changes to the table. + * Verifies that no snapshots are deleted and no deletion properties are set. + */ + @Test + void testDeleteEmptySnapshotList() throws IOException { + List testSnapshots = IcebergTestUtil.getSnapshots(); + + // Create base metadata + TableMetadata baseMetadata = BASE_TABLE_METADATA; + for (Snapshot snapshot : testSnapshots) { + baseMetadata = + TableMetadata.buildFrom(baseMetadata) + .setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH) + .build(); + } + + // Delete empty list - new metadata is same as base (no snapshots deleted) + Snapshot lastSnapshot = testSnapshots.get(testSnapshots.size() - 1); + Map properties = new HashMap<>(baseMetadata.properties()); + properties.put( + CatalogConstants.SNAPSHOTS_JSON_KEY, + SnapshotsUtil.serializedSnapshots(baseMetadata.snapshots())); + properties.put( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap(IcebergTestUtil.createMainBranchRefPointingTo(lastSnapshot))); + + TableMetadata newMetadata = baseMetadata.replaceProperties(properties); + + TableMetadata result = + openHouseInternalTableOperations.snapshotDiffApplier.applySnapshots( + baseMetadata, newMetadata); + + // Verify no changes were made + Assertions.assertEquals( + baseMetadata.snapshots().size(), + result.snapshots().size(), + "No snapshots should be deleted when list is empty"); + + // Verify no deletion tracking properties were added + Map resultProperties = result.properties(); + String deletedSnapshots = + resultProperties.get(getCanonicalFieldName(CatalogConstants.DELETED_SNAPSHOTS)); + Assertions.assertNull(deletedSnapshots, "No deleted snapshots property should be set"); + } + + /** + * Tests that attempting to delete a null list of snapshots makes no changes to the table. + * Verifies that no snapshots are deleted and no deletion properties are set. + */ + @Test + void testDeleteNullSnapshotList() throws IOException { + List testSnapshots = IcebergTestUtil.getSnapshots(); + + // Create base metadata + TableMetadata baseMetadata = BASE_TABLE_METADATA; + for (Snapshot snapshot : testSnapshots) { + baseMetadata = + TableMetadata.buildFrom(baseMetadata) + .setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH) + .build(); + } + + // Delete null list - new metadata is same as base (no snapshots deleted) + Snapshot lastSnapshot = testSnapshots.get(testSnapshots.size() - 1); + Map properties = new HashMap<>(baseMetadata.properties()); + properties.put( + CatalogConstants.SNAPSHOTS_JSON_KEY, + SnapshotsUtil.serializedSnapshots(baseMetadata.snapshots())); + properties.put( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap(IcebergTestUtil.createMainBranchRefPointingTo(lastSnapshot))); + + TableMetadata newMetadata = baseMetadata.replaceProperties(properties); + + TableMetadata result = + openHouseInternalTableOperations.snapshotDiffApplier.applySnapshots( + baseMetadata, newMetadata); + + // Verify no changes were made + Assertions.assertEquals( + baseMetadata.snapshots().size(), + result.snapshots().size(), + "No snapshots should be deleted when list is null"); + + // Verify no deletion tracking properties were added + Map resultProperties = result.properties(); + String deletedSnapshots = + resultProperties.get(getCanonicalFieldName(CatalogConstants.DELETED_SNAPSHOTS)); + Assertions.assertNull(deletedSnapshots, "No deleted snapshots property should be set"); + } + + /** + * Tests that attempting to delete a snapshot that doesn't exist in the metadata has no effect. + * Verifies that snapshot count remains unchanged and no deletion tracking occurs. + */ + @Test + void testDeleteNonExistentSnapshot() throws IOException { + List testSnapshots = IcebergTestUtil.getSnapshots(); + + // Create base metadata + TableMetadata baseMetadata = BASE_TABLE_METADATA; + for (Snapshot snapshot : testSnapshots) { + baseMetadata = + TableMetadata.buildFrom(baseMetadata) + .setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH) + .build(); + } + + // Create a snapshot that doesn't exist in the metadata + List extraSnapshots = IcebergTestUtil.getExtraSnapshots(); + Snapshot nonExistentSnapshot = extraSnapshots.get(0); + + // New metadata is same as base (non-existent snapshot can't be removed) + Snapshot lastSnapshot = testSnapshots.get(testSnapshots.size() - 1); + Map properties = new HashMap<>(baseMetadata.properties()); + properties.put( + CatalogConstants.SNAPSHOTS_JSON_KEY, + SnapshotsUtil.serializedSnapshots(baseMetadata.snapshots())); + properties.put( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap(IcebergTestUtil.createMainBranchRefPointingTo(lastSnapshot))); + + TableMetadata newMetadata = baseMetadata.replaceProperties(properties); + + TableMetadata result = + openHouseInternalTableOperations.snapshotDiffApplier.applySnapshots( + baseMetadata, newMetadata); + + // Verify original snapshots are unchanged + Assertions.assertEquals( + baseMetadata.snapshots().size(), + result.snapshots().size(), + "Snapshot count should be unchanged when deleting non-existent snapshot"); + + // Verify deletion is not tracked (since no actual deletion occurred) + Map resultProperties = result.properties(); + String deletedSnapshots = + resultProperties.get(getCanonicalFieldName(CatalogConstants.DELETED_SNAPSHOTS)); + Assertions.assertNull(deletedSnapshots, "No deleted snapshots should be tracked"); + } + + /** + * Tests that snapshot deletion operations record the correct metrics. Verifies that + * SNAPSHOTS_DELETED_CTR counter is incremented by the number of deleted snapshots. + */ + @Test + void testDeleteSnapshotMetricsRecorded() throws IOException { + List testSnapshots = IcebergTestUtil.getSnapshots(); + + // Create base metadata + TableMetadata baseMetadata = BASE_TABLE_METADATA; + for (Snapshot snapshot : testSnapshots) { + baseMetadata = TableMetadata.buildFrom(baseMetadata).addSnapshot(snapshot).build(); + } + + // Make baseMetadata effectively final for lambda usage + final TableMetadata finalBaseMetadata = baseMetadata; + + // Delete some snapshots (first two snapshots) + List remainingSnapshots = testSnapshots.subList(2, testSnapshots.size()); + + Map properties = new HashMap<>(finalBaseMetadata.properties()); + properties.put( + CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(remainingSnapshots)); + properties.put( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap(new HashMap<>())); // No refs since all are unreferenced + + TableMetadata newMetadata = finalBaseMetadata.replaceProperties(properties); + + // Use the operations instance with mock metrics reporter + openHouseInternalTableOperationsWithMockMetrics.snapshotDiffApplier.applySnapshots( + finalBaseMetadata, newMetadata); + + // Verify metrics were recorded + Mockito.verify(mockMetricsReporter) + .count( + eq(InternalCatalogMetricsConstant.SNAPSHOTS_DELETED_CTR), + eq((double) 2)); // 2 snapshots deleted + } + + /** + * Tests that snapshot deletion metrics are recorded when deleting unreferenced snapshots. + * Verifies that SNAPSHOTS_DELETED_CTR counter tracks deletions with branch references present. + */ + @Test + void testDeleteSnapshotMetricsRecordedBranch() throws IOException { + List testSnapshots = IcebergTestUtil.getSnapshots(); + + // Create base metadata with snapshots that have branch references + TableMetadata baseMetadata = + TableMetadata.buildFrom(BASE_TABLE_METADATA) + .addSnapshot(testSnapshots.get(0)) // Unreferenced - can be deleted + .addSnapshot(testSnapshots.get(1)) // Unreferenced - can be deleted + .setBranchSnapshot( + testSnapshots.get(2), SnapshotRef.MAIN_BRANCH) // Referenced - cannot be deleted + .build(); + + // Delete unreferenced snapshots (first two snapshots) + Snapshot referencedSnapshot = testSnapshots.get(2); + List remainingSnapshots = List.of(referencedSnapshot); + + Map properties = new HashMap<>(baseMetadata.properties()); + properties.put( + CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(remainingSnapshots)); + properties.put( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap( + IcebergTestUtil.createMainBranchRefPointingTo(referencedSnapshot))); + + TableMetadata newMetadata = baseMetadata.replaceProperties(properties); + + // Use the operations instance with mock metrics reporter + openHouseInternalTableOperationsWithMockMetrics.snapshotDiffApplier.applySnapshots( + baseMetadata, newMetadata); + + // Verify metrics were recorded for the basic deletion + Mockito.verify(mockMetricsReporter) + .count( + eq(InternalCatalogMetricsConstant.SNAPSHOTS_DELETED_CTR), + eq((double) 2)); // 2 snapshots deleted + } + + /** + * Tests that snapshot deletion metrics are not recorded when no actual deletion occurs. Verifies + * that SNAPSHOTS_DELETED_CTR counter is not called for non-existent snapshots. + */ + @Test + void testDeleteSnapshotMetricsRecordedNonExistent() throws IOException { + List testSnapshots = IcebergTestUtil.getSnapshots(); + + // Create base metadata + TableMetadata baseMetadata = BASE_TABLE_METADATA; + for (Snapshot snapshot : testSnapshots) { + baseMetadata = + TableMetadata.buildFrom(baseMetadata) + .setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH) + .build(); + } + + // Make baseMetadata effectively final for lambda usage + final TableMetadata finalBaseMetadata = baseMetadata; + + // Create a snapshot that doesn't exist in the metadata + List extraSnapshots = IcebergTestUtil.getExtraSnapshots(); + Snapshot nonExistentSnapshot = extraSnapshots.get(0); + + // New metadata is same as base (non-existent snapshot can't be removed) + Snapshot lastSnapshot = testSnapshots.get(testSnapshots.size() - 1); + Map properties = new HashMap<>(finalBaseMetadata.properties()); + properties.put( + CatalogConstants.SNAPSHOTS_JSON_KEY, + SnapshotsUtil.serializedSnapshots(finalBaseMetadata.snapshots())); + properties.put( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap(IcebergTestUtil.createMainBranchRefPointingTo(lastSnapshot))); + + TableMetadata newMetadata = finalBaseMetadata.replaceProperties(properties); + + // Use the operations instance with mock metrics reporter + openHouseInternalTableOperationsWithMockMetrics.snapshotDiffApplier.applySnapshots( + finalBaseMetadata, newMetadata); + + // Verify metrics are not recorded for non-existent snapshots (no actual deletion) + Mockito.verify(mockMetricsReporter, Mockito.never()) + .count(eq(InternalCatalogMetricsConstant.SNAPSHOTS_DELETED_CTR), Mockito.anyDouble()); + } + + /** + * Tests that deleting all unreferenced snapshots succeeds without errors. Verifies that all + * snapshots can be deleted when no branches or tags reference them. + */ + @Test + void testDeleteAllUnreferencedSnapshotsSucceeds() throws IOException { + List testSnapshots = IcebergTestUtil.getSnapshots(); + + // Create base metadata with unreferenced snapshots only (no main branch or other refs) + TableMetadata baseMetadata = BASE_TABLE_METADATA; + for (Snapshot snapshot : testSnapshots) { + baseMetadata = TableMetadata.buildFrom(baseMetadata).addSnapshot(snapshot).build(); + } + // Note: No setBranchSnapshot or setRef calls - all snapshots are unreferenced + + // Make baseMetadata effectively final for lambda usage + final TableMetadata finalBaseMetadata = baseMetadata; + + // Attempt to delete all unreferenced snapshots + Map properties = new HashMap<>(finalBaseMetadata.properties()); + properties.put( + CatalogConstants.SNAPSHOTS_JSON_KEY, + SnapshotsUtil.serializedSnapshots(List.of())); // Empty - all snapshots deleted + properties.put( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap(new HashMap<>())); // No refs + + TableMetadata newMetadata = finalBaseMetadata.replaceProperties(properties); + + // This should succeed since no snapshots are referenced by any branch/tag + TableMetadata result = + Assertions.assertDoesNotThrow( + () -> + openHouseInternalTableOperations.snapshotDiffApplier.applySnapshots( + finalBaseMetadata, newMetadata), + "Should succeed when deleting all unreferenced snapshots"); + + // Verify all snapshots were removed from the metadata + Assertions.assertEquals( + 0, + result.snapshots().size(), + "All unreferenced snapshots should be deleted, resulting in empty snapshots list"); + + // Verify deletion tracking shows all snapshots were deleted + Map resultProperties = result.properties(); + String deletedSnapshots = + resultProperties.get(getCanonicalFieldName(CatalogConstants.DELETED_SNAPSHOTS)); + Assertions.assertNotNull(deletedSnapshots, "Deleted snapshots should be tracked"); + + for (Snapshot snapshot : testSnapshots) { + Assertions.assertTrue( + deletedSnapshots.contains(Long.toString(snapshot.snapshotId())), + "Snapshot " + snapshot.snapshotId() + " should be tracked as deleted"); + } + } } diff --git a/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/SnapshotDiffApplierTest.java b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/SnapshotDiffApplierTest.java new file mode 100644 index 000000000..a1319475d --- /dev/null +++ b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/SnapshotDiffApplierTest.java @@ -0,0 +1,934 @@ +package com.linkedin.openhouse.internal.catalog; + +import static com.linkedin.openhouse.internal.catalog.mapper.HouseTableSerdeUtils.getCanonicalFieldName; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.linkedin.openhouse.cluster.metrics.micrometer.MetricsReporter; +import com.linkedin.openhouse.internal.catalog.exception.InvalidIcebergSnapshotException; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import lombok.SneakyThrows; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.SnapshotRefParser; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +/** + * Unit tests for {@link SnapshotDiffApplier}. Tests the refactored snapshot logic that was + * extracted from OpenHouseInternalTableOperations. + */ +public class SnapshotDiffApplierTest { + + private SnapshotDiffApplier snapshotDiffApplier; + private MetricsReporter mockMetricsReporter; + private TableMetadata baseMetadata; + private static final String TEST_TABLE_LOCATION = getTempLocation(); + + @SneakyThrows + private static String getTempLocation() { + return Files.createTempDirectory(UUID.randomUUID().toString()).toString(); + } + + @BeforeEach + void setup() { + mockMetricsReporter = Mockito.mock(MetricsReporter.class); + snapshotDiffApplier = new SnapshotDiffApplier(mockMetricsReporter); + + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + + baseMetadata = + TableMetadata.newTableMetadata( + schema, + PartitionSpec.unpartitioned(), + SortOrder.unsorted(), + TEST_TABLE_LOCATION, + new HashMap<>()); + } + + // ========== Helper Methods ========== + + /** + * Creates metadata with snapshots and refs properties for testing. + * + * @param base Base metadata to start from + * @param snapshots Snapshots to include + * @param refs Snapshot refs to include (nullable) + * @return Metadata with properties set + */ + private TableMetadata createMetadataWithSnapshots( + TableMetadata base, List snapshots, Map refs) { + Map properties = new HashMap<>(base.properties()); + properties.put( + CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(snapshots)); + if (refs != null) { + properties.put(CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap(refs)); + } + return base.replaceProperties(properties); + } + + /** + * Creates metadata with snapshots pointing to the last snapshot as main branch. + * + * @param base Base metadata to start from + * @param snapshots Snapshots to include + * @return Metadata with snapshots and main branch ref + */ + private TableMetadata createMetadataWithSnapshotsAndMainRef( + TableMetadata base, List snapshots) { + Map refs = + IcebergTestUtil.createMainBranchRefPointingTo(snapshots.get(snapshots.size() - 1)); + return createMetadataWithSnapshots(base, snapshots, refs); + } + + /** + * Adds snapshots to metadata and sets main branch to the last snapshot. + * + * @param metadata Base metadata + * @param snapshots Snapshots to add + * @return Updated metadata + */ + private TableMetadata addSnapshotsToMetadata(TableMetadata metadata, List snapshots) { + TableMetadata.Builder builder = TableMetadata.buildFrom(metadata); + for (Snapshot snapshot : snapshots) { + builder.addSnapshot(snapshot); + } + if (!snapshots.isEmpty()) { + Snapshot lastSnapshot = snapshots.get(snapshots.size() - 1); + SnapshotRef ref = SnapshotRef.branchBuilder(lastSnapshot.snapshotId()).build(); + builder.setRef(SnapshotRef.MAIN_BRANCH, ref); + } + return builder.build(); + } + + /** Verifies that when no snapshot JSON is provided, metadata is returned unmodified. */ + @Test + void testApplySnapshots_noSnapshotsJson_returnsUnmodified() { + TableMetadata result = snapshotDiffApplier.applySnapshots(null, baseMetadata); + + assertEquals(baseMetadata, result); + verifyNoInteractions(mockMetricsReporter); + } + + /** Verifies that table creation (null base) with main branch is handled correctly. */ + @Test + void testApplySnapshots_nullBase_handlesTableCreationWithMainBranch() throws IOException { + List snapshots = IcebergTestUtil.getSnapshots(); + TableMetadata newMetadata = createMetadataWithSnapshotsAndMainRef(baseMetadata, snapshots); + + TableMetadata result = snapshotDiffApplier.applySnapshots(null, newMetadata); + + assertNotNull(result); + assertEquals(snapshots.size(), result.snapshots().size()); + } + + // ========== Basic Functionality Tests ========== + + /** Verifies that new snapshots are added correctly to the main branch. */ + @Test + void testApplySnapshots_addNewSnapshotsToMainBranch_success() throws IOException { + List initialSnapshots = IcebergTestUtil.getSnapshots(); + TableMetadata baseWithSnapshots = addSnapshotsToMetadata(baseMetadata, initialSnapshots); + + List allSnapshots = new ArrayList<>(initialSnapshots); + allSnapshots.addAll(IcebergTestUtil.getExtraSnapshots()); + TableMetadata newMetadata = + createMetadataWithSnapshotsAndMainRef(baseWithSnapshots, allSnapshots); + + TableMetadata result = snapshotDiffApplier.applySnapshots(baseWithSnapshots, newMetadata); + + assertNotNull(result); + assertTrue(result.snapshots().size() > baseWithSnapshots.snapshots().size()); + verify(mockMetricsReporter, atLeastOnce()).count(anyString(), anyDouble()); + } + + /** Verifies that deleting snapshots from main branch works correctly. */ + @Test + void testApplySnapshots_deleteSnapshotsFromMainBranch_success() throws IOException { + List snapshots = IcebergTestUtil.getSnapshots(); + TableMetadata baseWithSnapshots = addSnapshotsToMetadata(baseMetadata, snapshots); + + List remainingSnapshots = snapshots.subList(1, snapshots.size()); + TableMetadata newMetadata = + createMetadataWithSnapshotsAndMainRef(baseWithSnapshots, remainingSnapshots); + + TableMetadata result = snapshotDiffApplier.applySnapshots(baseWithSnapshots, newMetadata); + + assertNotNull(result); + assertEquals(remainingSnapshots.size(), result.snapshots().size()); + } + + /** Verifies that updating main branch references works correctly. */ + @Test + void testApplySnapshots_mainBranchUpdates_success() throws IOException { + List snapshots = IcebergTestUtil.getSnapshots(); + TableMetadata baseWithSnapshots = addSnapshotsToMetadata(baseMetadata, snapshots); + + Snapshot newBranchTarget = snapshots.get(1); + Map refs = IcebergTestUtil.createMainBranchRefPointingTo(newBranchTarget); + TableMetadata newMetadata = createMetadataWithSnapshots(baseWithSnapshots, snapshots, refs); + + TableMetadata result = snapshotDiffApplier.applySnapshots(baseWithSnapshots, newMetadata); + + assertNotNull(result); + assertNotNull(result.currentSnapshot()); + assertEquals(newBranchTarget.snapshotId(), result.currentSnapshot().snapshotId()); + } + + /** Verifies that snapshots are added in timestamp order to the main branch. */ + @Test + void testApplySnapshots_snapshotsOrderedByTimestamp_success() throws IOException { + List initialSnapshots = IcebergTestUtil.getSnapshots(); + TableMetadata baseWithSnapshots = addSnapshotsToMetadata(baseMetadata, initialSnapshots); + + // Add extra snapshots which may have different timestamps + List extraSnapshots = IcebergTestUtil.getExtraSnapshots(); + List allSnapshots = new ArrayList<>(initialSnapshots); + allSnapshots.addAll(extraSnapshots); + + // Deliberately shuffle to ensure ordering is not dependent on input order + List shuffledSnapshots = new ArrayList<>(allSnapshots); + Collections.shuffle(shuffledSnapshots); + + TableMetadata newMetadata = + createMetadataWithSnapshotsAndMainRef(baseWithSnapshots, shuffledSnapshots); + + TableMetadata result = snapshotDiffApplier.applySnapshots(baseWithSnapshots, newMetadata); + + assertNotNull(result); + // Verify snapshots are ordered by timestamp + List resultSnapshots = result.snapshots(); + assertTrue(resultSnapshots.size() > 0, "Should have snapshots"); + + // Verify each snapshot timestamp is <= the next one + for (int i = 1; i < resultSnapshots.size(); i++) { + Snapshot prev = resultSnapshots.get(i - 1); + Snapshot current = resultSnapshots.get(i); + assertTrue( + prev.timestampMillis() <= current.timestampMillis(), + String.format( + "Snapshots should be ordered by timestamp: snapshot[%d].timestamp=%d " + + "should be <= snapshot[%d].timestamp=%d", + i - 1, prev.timestampMillis(), i, current.timestampMillis())); + } + } + + // ========== Validation Tests ========== + + /** + * Verifies that deleting the current snapshot from main branch without replacements throws an + * exception. + */ + @Test + void testApplySnapshots_deletingCurrentSnapshotFromMainBranchWithoutReplacement_throwsException() + throws IOException { + List snapshots = IcebergTestUtil.getSnapshots(); + TableMetadata baseWithSnapshots = addSnapshotsToMetadata(baseMetadata, snapshots); + + TableMetadata newMetadata = + createMetadataWithSnapshots(baseWithSnapshots, Collections.emptyList(), new HashMap<>()); + + InvalidIcebergSnapshotException exception = + assertThrows( + InvalidIcebergSnapshotException.class, + () -> snapshotDiffApplier.applySnapshots(baseWithSnapshots, newMetadata)); + + assertTrue(exception.getMessage().contains("Cannot delete the current snapshot")); + } + + /** Verifies that duplicate snapshot IDs in provided snapshots throw an exception. */ + @Test + void testApplySnapshots_duplicateSnapshotIds_throwsException() throws IOException { + List snapshots = IcebergTestUtil.getSnapshots(); + TableMetadata baseWithSnapshots = addSnapshotsToMetadata(baseMetadata, snapshots); + + // Create a list with duplicate snapshots (same snapshot ID appears twice) + List duplicateSnapshots = new ArrayList<>(); + duplicateSnapshots.add(snapshots.get(0)); + duplicateSnapshots.add(snapshots.get(0)); // Duplicate + + TableMetadata newMetadata = + createMetadataWithSnapshotsAndMainRef(baseWithSnapshots, duplicateSnapshots); + + // Should throw IllegalStateException due to duplicate keys in toMap collector + assertThrows( + IllegalStateException.class, + () -> snapshotDiffApplier.applySnapshots(baseWithSnapshots, newMetadata)); + } + + // ========== Metrics Tests ========== + + /** Verifies that staged snapshots (not on main branch) trigger the correct metrics. */ + @Test + void testMetrics_addStagedSnapshots_recordsStagedCounter() throws IOException { + List baseSnapshots = IcebergTestUtil.getSnapshots(); + TableMetadata baseWithSnapshots = addSnapshotsToMetadata(baseMetadata, baseSnapshots); + + List wapSnapshots = IcebergTestUtil.getWapSnapshots(); + List allSnapshots = new ArrayList<>(baseSnapshots); + allSnapshots.addAll(wapSnapshots); + + Map refs = + IcebergTestUtil.createMainBranchRefPointingTo(baseSnapshots.get(baseSnapshots.size() - 1)); + TableMetadata newMetadata = createMetadataWithSnapshots(baseWithSnapshots, allSnapshots, refs); + + TableMetadata result = snapshotDiffApplier.applySnapshots(baseWithSnapshots, newMetadata); + + assertNotNull(result); + verify(mockMetricsReporter) + .count(eq(InternalCatalogMetricsConstant.SNAPSHOTS_STAGED_CTR), anyDouble()); + } + + /** Verifies that deleting snapshots from main branch triggers the correct metrics. */ + @Test + void testMetrics_deleteSnapshotsFromMainBranch_recordsDeletedCounter() throws IOException { + List snapshots = IcebergTestUtil.getSnapshots(); + TableMetadata baseWithSnapshots = addSnapshotsToMetadata(baseMetadata, snapshots); + + List remainingSnapshots = snapshots.subList(1, snapshots.size()); + TableMetadata newMetadata = + createMetadataWithSnapshotsAndMainRef(baseWithSnapshots, remainingSnapshots); + + TableMetadata result = snapshotDiffApplier.applySnapshots(baseWithSnapshots, newMetadata); + + assertNotNull(result); + assertEquals(remainingSnapshots.size(), result.snapshots().size()); + verify(mockMetricsReporter) + .count(eq(InternalCatalogMetricsConstant.SNAPSHOTS_DELETED_CTR), eq(1.0)); + } + + // ========== Property Management Tests ========== + + /** Verifies that appended snapshot IDs to main branch are recorded in properties. */ + @Test + void testProperties_appendedSnapshotsToMainBranch_recordedCorrectly() throws IOException { + List baseSnapshots = IcebergTestUtil.getSnapshots(); + TableMetadata baseWithSnapshots = addSnapshotsToMetadata(baseMetadata, baseSnapshots); + + List newSnapshotsList = IcebergTestUtil.getExtraSnapshots(); + List allSnapshots = new ArrayList<>(baseSnapshots); + allSnapshots.addAll(newSnapshotsList); + TableMetadata newMetadata = + createMetadataWithSnapshotsAndMainRef(baseWithSnapshots, allSnapshots); + + TableMetadata result = snapshotDiffApplier.applySnapshots(baseWithSnapshots, newMetadata); + + assertNotNull(result); + String appendedSnapshots = + result.properties().get(getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS)); + assertNotNull(appendedSnapshots, "Appended snapshots should be recorded in properties"); + + // Verify actual snapshot IDs are present + for (Snapshot newSnapshot : newSnapshotsList) { + assertTrue( + appendedSnapshots.contains(String.valueOf(newSnapshot.snapshotId())), + "Snapshot ID " + newSnapshot.snapshotId() + " should be in appended_snapshots"); + } + } + + /** + * Verifies that temporary snapshot processing keys are removed from final properties when adding + * to main branch. + */ + @Test + void testProperties_tempKeysRemovedForMainBranch_success() throws IOException { + List snapshots = IcebergTestUtil.getSnapshots(); + TableMetadata newMetadata = createMetadataWithSnapshotsAndMainRef(baseMetadata, snapshots); + + TableMetadata result = snapshotDiffApplier.applySnapshots(null, newMetadata); + + assertNotNull(result); + assertFalse( + result.properties().containsKey(CatalogConstants.SNAPSHOTS_JSON_KEY), + "Temp snapshots JSON key should be removed"); + assertFalse( + result.properties().containsKey(CatalogConstants.SNAPSHOTS_REFS_KEY), + "Temp snapshots refs key should be removed"); + } + + /** Verifies that providing a non-MAIN branch reference throws UnsupportedOperationException. */ + @Test + void testApplySnapshots_nonMainBranchReference_throwsUnsupportedOperationException() + throws IOException { + List snapshots = IcebergTestUtil.getSnapshots(); + Snapshot lastSnapshot = snapshots.get(snapshots.size() - 1); + + // Create refs with a feature branch instead of MAIN + Map refs = new HashMap<>(); + SnapshotRef featureBranchRef = SnapshotRef.branchBuilder(lastSnapshot.snapshotId()).build(); + refs.put("feature-branch", SnapshotRefParser.toJson(featureBranchRef)); + + TableMetadata newMetadata = createMetadataWithSnapshots(baseMetadata, snapshots, refs); + + UnsupportedOperationException exception = + assertThrows( + UnsupportedOperationException.class, + () -> snapshotDiffApplier.applySnapshots(null, newMetadata)); + + assertTrue(exception.getMessage().contains("OpenHouse supports only MAIN branch")); + } + + /** + * Verifies that providing a branch ref pointing to a non-existent snapshot ID causes an + * exception. This tests a critical bug where no validation exists before calling + * setBranchSnapshot. + */ + @Test + void testApplySnapshots_refPointingToNonExistentSnapshot_throwsException() throws IOException { + List snapshots = IcebergTestUtil.getSnapshots(); + + // Create a ref pointing to a snapshot ID that doesn't exist in the snapshot list + long nonExistentSnapshotId = 999999999L; + Map refs = new HashMap<>(); + SnapshotRef invalidRef = SnapshotRef.branchBuilder(nonExistentSnapshotId).build(); + refs.put(SnapshotRef.MAIN_BRANCH, SnapshotRefParser.toJson(invalidRef)); + + TableMetadata newMetadata = createMetadataWithSnapshots(baseMetadata, snapshots, refs); + + // Iceberg's setBranchSnapshot should throw ValidationException when snapshot doesn't exist + assertThrows( + ValidationException.class, () -> snapshotDiffApplier.applySnapshots(null, newMetadata)); + } + + /** + * Verifies that attempting to set a ref to a snapshot being deleted throws an exception. The + * validation correctly catches this case where a commit attempts to both delete a snapshot and + * set the main branch to point to that deleted snapshot. This prevents leaving the table in an + * invalid state. + */ + @Test + void testApplySnapshots_settingRefToDeletedSnapshot_throwsException() throws IOException { + List snapshots = IcebergTestUtil.getSnapshots(); + TableMetadata baseWithSnapshots = addSnapshotsToMetadata(baseMetadata, snapshots); + + // Try to delete the first snapshot, then point main branch to the first (deleted) one + Snapshot snapshotToDelete = snapshots.get(0); + List remainingSnapshots = snapshots.subList(1, snapshots.size()); + + // Create refs pointing to the snapshot we're trying to delete + Map refs = new HashMap<>(); + SnapshotRef mainRef = SnapshotRef.branchBuilder(snapshotToDelete.snapshotId()).build(); + refs.put(SnapshotRef.MAIN_BRANCH, SnapshotRefParser.toJson(mainRef)); + + TableMetadata newMetadata = + createMetadataWithSnapshots(baseWithSnapshots, remainingSnapshots, refs); + + // This should throw an exception because we're trying to delete a snapshot + // while setting a branch reference to it + InvalidIcebergSnapshotException exception = + assertThrows( + InvalidIcebergSnapshotException.class, + () -> snapshotDiffApplier.applySnapshots(baseWithSnapshots, newMetadata)); + + assertTrue( + exception + .getMessage() + .contains("Cannot delete snapshots that are still referenced by branches/tags")); + assertTrue(exception.getMessage().contains("snapshot " + snapshotToDelete.snapshotId())); + assertTrue(exception.getMessage().contains("main")); + } + + /** + * Verifies that a snapshot with an invalid (non-numeric) source snapshot ID in cherry-pick causes + * JsonSyntaxException during parsing. NOTE: This fails at the JSON parsing stage due to Iceberg's + * strict validation, not at the cherry-pick categorization stage. + */ + @Test + void testApplySnapshots_invalidCherryPickSourceSnapshotId_failsAtParsingStage() { + // Create a custom snapshot JSON with invalid source-snapshot-id using Gson + // Note: Iceberg validates snapshot structure strictly, so this fails at Gson parsing + Gson gson = new Gson(); + JsonObject snapshotJson = new JsonObject(); + snapshotJson.addProperty("snapshot-id", 1234567890123456789L); + snapshotJson.addProperty("timestamp-ms", 1669126937912L); + JsonObject summary = new JsonObject(); + summary.addProperty("operation", "append"); + summary.addProperty("source-snapshot-id", "not-a-number"); + snapshotJson.add("summary", summary); + snapshotJson.addProperty("manifest-list", "/tmp/test.avro"); + snapshotJson.addProperty("schema-id", 0); + + Map properties = new HashMap<>(baseMetadata.properties()); + properties.put(CatalogConstants.SNAPSHOTS_JSON_KEY, "[" + gson.toJson(snapshotJson) + "]"); + + TableMetadata newMetadata = baseMetadata.replaceProperties(properties); + + // Should throw JsonSyntaxException when Gson tries to parse the invalid source-snapshot-id + assertThrows( + com.google.gson.JsonSyntaxException.class, + () -> snapshotDiffApplier.applySnapshots(null, newMetadata)); + } + + /** + * Verifies that a snapshot with null summary is handled correctly during WAP detection. Tests + * lines 172, 180, 202 which check snapshot.summary(). NOTE: This currently fails at Iceberg's + * parsing stage due to strict validation. + */ + @Test + void testApplySnapshots_snapshotWithNullSummary_failsAtParsingStage() { + // Create a custom snapshot JSON with null/missing summary using Gson + // Note: Iceberg validates snapshot structure strictly, so this fails at parsing + Gson gson = new Gson(); + JsonObject snapshotJson = new JsonObject(); + snapshotJson.addProperty("snapshot-id", 1234567890123456789L); + snapshotJson.addProperty("timestamp-ms", 1669126937912L); + snapshotJson.addProperty("manifest-list", "/tmp/test.avro"); + snapshotJson.addProperty("schema-id", 0); + + Map properties = new HashMap<>(baseMetadata.properties()); + properties.put(CatalogConstants.SNAPSHOTS_JSON_KEY, "[" + gson.toJson(snapshotJson) + "]"); + + // Add a main branch ref pointing to this snapshot + Map refs = new HashMap<>(); + SnapshotRef mainRef = SnapshotRef.branchBuilder(1234567890123456789L).build(); + refs.put(SnapshotRef.MAIN_BRANCH, SnapshotRefParser.toJson(mainRef)); + properties.put(CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap(refs)); + + TableMetadata newMetadata = baseMetadata.replaceProperties(properties); + + // Should throw JsonSyntaxException during Iceberg parsing due to missing required summary + assertThrows( + com.google.gson.JsonSyntaxException.class, + () -> snapshotDiffApplier.applySnapshots(null, newMetadata)); + } + + /** + * Verifies behavior when provided snapshots are empty but refs are not. Tests that a ref pointing + * to nothing causes an exception. + */ + @Test + void testApplySnapshots_emptySnapshotsWithNonEmptyRefs_throwsException() { + // Create refs pointing to a snapshot that doesn't exist + Map refs = new HashMap<>(); + SnapshotRef mainRef = SnapshotRef.branchBuilder(123456789L).build(); + refs.put(SnapshotRef.MAIN_BRANCH, SnapshotRefParser.toJson(mainRef)); + + TableMetadata newMetadata = + createMetadataWithSnapshots(baseMetadata, Collections.emptyList(), refs); + + // Should throw ValidationException because ref points to non-existent snapshot + assertThrows( + org.apache.iceberg.exceptions.ValidationException.class, + () -> snapshotDiffApplier.applySnapshots(null, newMetadata)); + } + + /** Verifies that null providedMetadata throws NullPointerException. */ + @Test + void testApplySnapshots_nullProvidedMetadata_throwsNullPointerException() { + NullPointerException exception = + assertThrows( + NullPointerException.class, + () -> snapshotDiffApplier.applySnapshots(baseMetadata, null)); + + assertTrue(exception.getMessage().contains("providedMetadata cannot be null")); + } + + /** Verifies that malformed JSON in SNAPSHOTS_JSON_KEY property throws exception. */ + @Test + void testApplySnapshots_malformedSnapshotsJson_throwsException() { + Map properties = new HashMap<>(baseMetadata.properties()); + properties.put(CatalogConstants.SNAPSHOTS_JSON_KEY, "{ invalid json {{"); + + TableMetadata newMetadata = baseMetadata.replaceProperties(properties); + + // Should throw JsonSyntaxException or similar from Gson + assertThrows( + com.google.gson.JsonSyntaxException.class, + () -> snapshotDiffApplier.applySnapshots(null, newMetadata)); + } + + /** Verifies that malformed JSON in SNAPSHOTS_REFS_KEY property throws exception. */ + @Test + void testApplySnapshots_malformedRefsJson_throwsException() throws IOException { + List snapshots = IcebergTestUtil.getSnapshots(); + Map properties = new HashMap<>(baseMetadata.properties()); + properties.put( + CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(snapshots)); + properties.put(CatalogConstants.SNAPSHOTS_REFS_KEY, "{ invalid json {{"); + + TableMetadata newMetadata = baseMetadata.replaceProperties(properties); + + // Should throw JsonSyntaxException or similar from Gson + assertThrows( + com.google.gson.JsonSyntaxException.class, + () -> snapshotDiffApplier.applySnapshots(null, newMetadata)); + } + + /** + * Verifies behavior when attempting to delete all snapshots with no replacement. This should be + * caught by the existing validation. + */ + @Test + void testApplySnapshots_deletingAllSnapshotsWithNoReplacement_throwsException() + throws IOException { + List snapshots = IcebergTestUtil.getSnapshots(); + TableMetadata baseWithSnapshots = addSnapshotsToMetadata(baseMetadata, snapshots); + + // Try to delete all snapshots without providing replacements + TableMetadata newMetadata = + createMetadataWithSnapshots(baseWithSnapshots, Collections.emptyList(), new HashMap<>()); + + InvalidIcebergSnapshotException exception = + assertThrows( + InvalidIcebergSnapshotException.class, + () -> snapshotDiffApplier.applySnapshots(baseWithSnapshots, newMetadata)); + + assertTrue(exception.getMessage().contains("Cannot delete the current snapshot")); + } + + /** + * Verifies transition from table with unreferenced snapshots to having a MAIN branch. Tests + * ref-only update without snapshot changes. + */ + @Test + void testApplySnapshots_baseWithUnreferencedSnapshotsOnly_addFirstMainBranch() + throws IOException { + List snapshots = IcebergTestUtil.getSnapshots(); + + // Create base with snapshots but no refs (all unreferenced) + TableMetadata base = baseMetadata; + for (Snapshot snapshot : snapshots) { + base = TableMetadata.buildFrom(base).addSnapshot(snapshot).build(); + } + // Verify no refs in base + assertTrue(base.refs().isEmpty() || !base.refs().containsKey(SnapshotRef.MAIN_BRANCH)); + + // Provided: same snapshots + MAIN ref to one of them + Snapshot mainSnapshot = snapshots.get(2); + Map refs = IcebergTestUtil.createMainBranchRefPointingTo(mainSnapshot); + TableMetadata newMetadata = createMetadataWithSnapshots(base, snapshots, refs); + + TableMetadata result = snapshotDiffApplier.applySnapshots(base, newMetadata); + + // Verify MAIN ref is set + assertNotNull(result.currentSnapshot()); + assertEquals(mainSnapshot.snapshotId(), result.currentSnapshot().snapshotId()); + + // Verify no add/delete operations (ref-only update) + assertEquals(snapshots.size(), result.snapshots().size()); + Map resultProps = result.properties(); + assertNull(resultProps.get(getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS))); + assertNull(resultProps.get(getCanonicalFieldName(CatalogConstants.DELETED_SNAPSHOTS))); + } + + /** + * Verifies table creation with no snapshots (empty state). Tests that an empty table can be + * created successfully. + */ + @Test + void testApplySnapshots_nullBaseEmptySnapshotsEmptyRefs_createsEmptyTable() { + // Provided: empty snapshots list, empty refs + TableMetadata newMetadata = + createMetadataWithSnapshots(baseMetadata, Collections.emptyList(), new HashMap<>()); + + TableMetadata result = snapshotDiffApplier.applySnapshots(null, newMetadata); + + // Verify empty table created + assertNotNull(result); + assertEquals(0, result.snapshots().size()); + assertNull(result.currentSnapshot()); + assertTrue(result.refs().isEmpty() || !result.refs().containsKey(SnapshotRef.MAIN_BRANCH)); + + // Verify no snapshot operations tracked + Map resultProps = result.properties(); + assertNull(resultProps.get(getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS))); + assertNull(resultProps.get(getCanonicalFieldName(CatalogConstants.STAGED_SNAPSHOTS))); + assertNull(resultProps.get(getCanonicalFieldName(CatalogConstants.DELETED_SNAPSHOTS))); + } + + /** + * Verifies adding both regular and staged snapshots in a single commit. Tests that snapshot + * categorization correctly handles mixed types. + */ + @Test + void testApplySnapshots_addRegularAndStagedSimultaneously() throws IOException { + // Start from empty base (no existing snapshots) + // Simulate a commit that adds both regular and staged snapshots simultaneously + + List extraSnapshots = IcebergTestUtil.getExtraSnapshots(); + + // Create a custom WAP snapshot without hardcoded sequence number to avoid conflicts + // Build snapshot JSON manually and wrap it in a Gson array + String wapSnapshotJson = + String.format( + "{\"snapshot-id\":%d,\"timestamp-ms\":%d,\"summary\":%s,\"manifest-list\":\"%s\",\"schema-id\":%d}", + 999940701710231339L, + 1669126937912L, + new Gson() + .toJson( + Map.of( + "operation", "append", + "wap.id", "test-wap", + "spark.app.id", "local-1669126906634", + "added-data-files", "1", + "added-records", "1")), + "/data/test.avro", + 0); + String wapSnapshotArrayJson = new Gson().toJson(List.of(wapSnapshotJson)); + List customWapSnapshots = SnapshotsUtil.parseSnapshots(null, wapSnapshotArrayJson); + + List allSnapshots = new ArrayList<>(); + allSnapshots.add(extraSnapshots.get(0)); // New regular snapshot + allSnapshots.add(customWapSnapshots.get(0)); // New staged snapshot + + // MAIN ref points to the new regular snapshot + Map refs = IcebergTestUtil.createMainBranchRefPointingTo(extraSnapshots.get(0)); + TableMetadata newMetadata = createMetadataWithSnapshots(baseMetadata, allSnapshots, refs); + + TableMetadata result = snapshotDiffApplier.applySnapshots(null, newMetadata); + + // Verify both snapshots added + assertEquals(2, result.snapshots().size()); + + // Verify regular snapshot is on MAIN + assertNotNull(result.currentSnapshot()); + assertEquals(extraSnapshots.get(0).snapshotId(), result.currentSnapshot().snapshotId()); + + // Verify tracking: regular appended, staged tracked separately + Map resultProps = result.properties(); + String appendedSnapshotsStr = + resultProps.get(getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS)); + String stagedSnapshotsStr = + resultProps.get(getCanonicalFieldName(CatalogConstants.STAGED_SNAPSHOTS)); + + assertNotNull(appendedSnapshotsStr); + assertTrue(appendedSnapshotsStr.contains(Long.toString(extraSnapshots.get(0).snapshotId()))); + + assertNotNull(stagedSnapshotsStr); + assertTrue(stagedSnapshotsStr.contains(Long.toString(customWapSnapshots.get(0).snapshotId()))); + } + + /** + * Verifies cherry-picking a staged snapshot while adding a new snapshot in the same commit. Tests + * compound operation tracking. + */ + @Test + void testApplySnapshots_cherryPickAndAddNewSimultaneously() throws IOException { + List testWapSnapshots = IcebergTestUtil.getWapSnapshots(); + + // Base: MAIN snapshot + staged snapshot + TableMetadata base = + TableMetadata.buildFrom(baseMetadata) + .setBranchSnapshot(testWapSnapshots.get(0), SnapshotRef.MAIN_BRANCH) + .addSnapshot(testWapSnapshots.get(1)) // Staged snapshot + .build(); + + // Provided: existing + new snapshot becomes MAIN, staged is cherry-picked + List allSnapshots = new ArrayList<>(); + allSnapshots.add(testWapSnapshots.get(0)); + allSnapshots.add(testWapSnapshots.get(1)); // Was staged, now cherry-picked + allSnapshots.add(testWapSnapshots.get(2)); // New snapshot + + // MAIN ref points to new snapshot + Map refs = + IcebergTestUtil.createMainBranchRefPointingTo(testWapSnapshots.get(2)); + TableMetadata newMetadata = createMetadataWithSnapshots(base, allSnapshots, refs); + + TableMetadata result = snapshotDiffApplier.applySnapshots(base, newMetadata); + + // Verify new snapshot is on MAIN + assertNotNull(result.currentSnapshot()); + assertEquals(testWapSnapshots.get(2).snapshotId(), result.currentSnapshot().snapshotId()); + + // Verify both operations tracked + Map resultProps = result.properties(); + String appendedSnapshotsStr = + resultProps.get(getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS)); + String cherryPickedSnapshotsStr = + resultProps.get(getCanonicalFieldName(CatalogConstants.CHERRY_PICKED_SNAPSHOTS)); + + // New snapshot should be appended + assertNotNull(appendedSnapshotsStr); + assertTrue(appendedSnapshotsStr.contains(Long.toString(testWapSnapshots.get(2).snapshotId()))); + + // Staged snapshot should be cherry-picked + assertNotNull(cherryPickedSnapshotsStr); + assertTrue( + cherryPickedSnapshotsStr.contains(Long.toString(testWapSnapshots.get(1).snapshotId()))); + } + + /** + * Verifies that attempting to delete the current snapshot while unreferenced snapshots exist + * throws an exception. Tests current snapshot protection. + */ + @Test + void testApplySnapshots_attemptDeleteCurrentWithUnreferencedPresent_throwsException() + throws IOException { + List snapshots = IcebergTestUtil.getSnapshots(); + + // Base: MAIN snapshot + 2 unreferenced snapshots + TableMetadata base = + TableMetadata.buildFrom(baseMetadata) + .addSnapshot(snapshots.get(0)) // Unreferenced + .addSnapshot(snapshots.get(1)) // Unreferenced + .setBranchSnapshot(snapshots.get(2), SnapshotRef.MAIN_BRANCH) // Current snapshot + .build(); + + // Provided: only the 2 unreferenced (delete MAIN), no new snapshots + List remainingSnapshots = snapshots.subList(0, 2); + TableMetadata newMetadata = + createMetadataWithSnapshots(base, remainingSnapshots, new HashMap<>()); + + // Should throw exception because current snapshot is being deleted without replacement + InvalidIcebergSnapshotException exception = + assertThrows( + InvalidIcebergSnapshotException.class, + () -> snapshotDiffApplier.applySnapshots(base, newMetadata)); + + assertTrue(exception.getMessage().contains("Cannot delete the current snapshot")); + assertTrue(exception.getMessage().contains(Long.toString(snapshots.get(2).snapshotId()))); + } + + /** + * Verifies adding regular (non-WAP) snapshots with empty refs. historically, such snapshots were + * automatically added to MAIN branch and tracked as APPENDED_SNAPSHOTS. This test validates + * backward compatibility with that behavior. NOTE: The semantics here are questionable - + * snapshots with no refs should arguably not be "appended" to MAIN, but this preserves the + * original behavior. + */ + @Test + void testApplySnapshots_regularSnapshotsWithEmptyRefs_autoAppendedToMain() throws IOException { + List baseSnapshots = IcebergTestUtil.getSnapshots(); + TableMetadata baseWithSnapshots = addSnapshotsToMetadata(baseMetadata, baseSnapshots); + + // Provided: existing + new snapshots, but empty refs map (no MAIN branch) + List extraSnapshots = IcebergTestUtil.getExtraSnapshots(); + List allSnapshots = new ArrayList<>(baseSnapshots); + allSnapshots.addAll(extraSnapshots); + + // Empty refs - no MAIN branch + TableMetadata newMetadata = + createMetadataWithSnapshots(baseWithSnapshots, allSnapshots, new HashMap<>()); + + TableMetadata result = snapshotDiffApplier.applySnapshots(baseWithSnapshots, newMetadata); + + // Verify new snapshots added + assertEquals(allSnapshots.size(), result.snapshots().size()); + + // Verify MAIN branch points to the latest snapshot (auto-appended to main) + assertNotNull(result.ref(SnapshotRef.MAIN_BRANCH)); + assertEquals( + allSnapshots.get(allSnapshots.size() - 1).snapshotId(), + result.ref(SnapshotRef.MAIN_BRANCH).snapshotId()); + + // Verify new snapshots tracked as appended (even though unreferenced, they're not staged WAP) + Map resultProps = result.properties(); + String appendedSnapshotsStr = + resultProps.get(getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS)); + + assertNotNull(appendedSnapshotsStr); + for (Snapshot extraSnapshot : extraSnapshots) { + assertTrue( + appendedSnapshotsStr.contains(Long.toString(extraSnapshot.snapshotId())), + "Snapshot " + extraSnapshot.snapshotId() + " should be tracked as appended"); + } + } + + /** + * Verifies cherry-picking multiple staged snapshots in sequence, testing both fast-forward and + * rebase scenarios. wap1 and wap2 both have the same parent. Cherry-picking wap1 first is a + * fast-forward (no new snapshot). Cherry-picking wap2 after main has moved requires a rebase (new + * snapshot created). + */ + @Test + void testApplySnapshots_cherryPickMultipleStagedSnapshotsOutOfOrder() throws IOException { + List testSnapshots = IcebergTestUtil.getSnapshots(); + List testWapSnapshots = IcebergTestUtil.getWapSnapshots(); + + // Setup: MAIN snapshot + 2 staged WAP snapshots (wap1, wap2) + TableMetadata base = + TableMetadata.buildFrom(baseMetadata) + .setBranchSnapshot(testSnapshots.get(0), SnapshotRef.MAIN_BRANCH) + .addSnapshot(testWapSnapshots.get(0)) // wap1 (wap.id="wap1") + .addSnapshot(testWapSnapshots.get(1)) // wap2 (wap.id="wap2") + .build(); + + // Step 1: Fast-forward cherry-pick wap1 + // wap1's parent == current main, so it's promoted directly (no new snapshot) + List allSnapshots1 = new ArrayList<>(); + allSnapshots1.add(testSnapshots.get(0)); + allSnapshots1.add(testWapSnapshots.get(0)); // wap1 now on main + allSnapshots1.add(testWapSnapshots.get(1)); // wap2 still staged + + // Set MAIN branch to point to wap1 + Map refs1 = + IcebergTestUtil.createMainBranchRefPointingTo(testWapSnapshots.get(0)); + TableMetadata newMetadata1 = createMetadataWithSnapshots(base, allSnapshots1, refs1); + + TableMetadata result1 = snapshotDiffApplier.applySnapshots(base, newMetadata1); + + // Verify fast-forward: only cherry_picked tracked, no new snapshot appended + assertNotNull(result1.currentSnapshot()); + assertEquals(testWapSnapshots.get(0).snapshotId(), result1.currentSnapshot().snapshotId()); + + Map resultProps1 = result1.properties(); + String cherryPickedSnapshots1 = + resultProps1.get(getCanonicalFieldName(CatalogConstants.CHERRY_PICKED_SNAPSHOTS)); + assertNotNull(cherryPickedSnapshots1); + assertTrue( + cherryPickedSnapshots1.contains(Long.toString(testWapSnapshots.get(0).snapshotId())), + "wap1 should be tracked as cherry-picked"); + assertNull( + resultProps1.get(getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS)), + "No new snapshot for fast-forward"); + + // Step 2: Rebase cherry-pick wap2 + // wap2's parent != current main (which is now wap1), so a new snapshot is created + // New snapshot has: parent=wap1, source-snapshot-id=wap2, published.wap.id="wap2" + List allSnapshots2 = new ArrayList<>(); + allSnapshots2.add(testSnapshots.get(0)); + allSnapshots2.add(testWapSnapshots.get(0)); // wap1 + allSnapshots2.add(testWapSnapshots.get(1)); // wap2 (source) + allSnapshots2.add(testWapSnapshots.get(2)); // New rebased snapshot + + Map refs2 = + IcebergTestUtil.createMainBranchRefPointingTo(testWapSnapshots.get(2)); + TableMetadata newMetadata2 = createMetadataWithSnapshots(result1, allSnapshots2, refs2); + + TableMetadata result2 = snapshotDiffApplier.applySnapshots(result1, newMetadata2); + + // Verify rebase: both cherry_picked (source) and appended (new snapshot) tracked + assertNotNull(result2.currentSnapshot()); + assertEquals(testWapSnapshots.get(2).snapshotId(), result2.currentSnapshot().snapshotId()); + + Map resultProps2 = result2.properties(); + + String cherryPickedSnapshots2 = + resultProps2.get(getCanonicalFieldName(CatalogConstants.CHERRY_PICKED_SNAPSHOTS)); + assertNotNull(cherryPickedSnapshots2); + assertTrue( + cherryPickedSnapshots2.contains(Long.toString(testWapSnapshots.get(1).snapshotId())), + "wap2 should be tracked as cherry-picked (source)"); + + String appendedSnapshots2 = + resultProps2.get(getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS)); + assertNotNull(appendedSnapshots2); + assertTrue( + appendedSnapshots2.contains(Long.toString(testWapSnapshots.get(2).snapshotId())), + "New rebased snapshot should be tracked as appended"); + + // Verify all 4 snapshots present + assertEquals(4, result2.snapshots().size()); + verify(mockMetricsReporter, atLeastOnce()) + .count(eq(InternalCatalogMetricsConstant.SNAPSHOTS_CHERRY_PICKED_CTR), anyDouble()); + } +} diff --git a/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/SnapshotInspectorTest.java b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/SnapshotInspectorTest.java deleted file mode 100644 index 3fb9ced17..000000000 --- a/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/SnapshotInspectorTest.java +++ /dev/null @@ -1,171 +0,0 @@ -package com.linkedin.openhouse.internal.catalog; - -import com.linkedin.openhouse.internal.catalog.exception.InvalidIcebergSnapshotException; -import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapperTest; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.attribute.FileAttribute; -import java.nio.file.attribute.PosixFilePermission; -import java.nio.file.attribute.PosixFilePermissions; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.ManifestFiles; -import org.apache.iceberg.ManifestWriter; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.SnapshotRef; -import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.hadoop.HadoopOutputFile; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; -import org.mockito.Mockito; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; - -@SpringBootTest -@Import(HouseTableMapperTest.MockConfiguration.class) -class SnapshotInspectorTest { - - @Autowired SnapshotInspector snapshotInspector; - - @TempDir static Path tempDir; - - private static final TableMetadata NO_SNAPSHOTS_METADATA = - TableMetadata.newTableMetadata( - new Schema( - Types.NestedField.required(1, "data", Types.StringType.get()), - Types.NestedField.required(2, "ts", Types.TimestampType.withoutZone())), - PartitionSpec.unpartitioned(), - UUID.randomUUID().toString(), - ImmutableMap.of()); - - @Test - void testValidateSnapshotsUpdateWithNoSnapshotMetadata() throws IOException { - - List testSnapshots = IcebergTestUtil.getSnapshots(); - // No exception since added as well deleted snapshots are allowed to support replication - // use case which performs table commit with added and deleted snapshots. - Assertions.assertDoesNotThrow( - () -> - snapshotInspector.validateSnapshotsUpdate( - NO_SNAPSHOTS_METADATA, testSnapshots.subList(0, 1), testSnapshots.subList(1, 4))); - Assertions.assertDoesNotThrow( - () -> - snapshotInspector.validateSnapshotsUpdate( - NO_SNAPSHOTS_METADATA, testSnapshots, Collections.emptyList())); - Assertions.assertDoesNotThrow( - () -> - snapshotInspector.validateSnapshotsUpdate( - NO_SNAPSHOTS_METADATA, Collections.emptyList(), testSnapshots)); - } - - @Test - void testValidateSnapshotsUpdateWithSnapshotMetadata() throws IOException { - List testSnapshots = IcebergTestUtil.getSnapshots(); - List extraTestSnapshots = IcebergTestUtil.getExtraSnapshots(); - TableMetadata metadataWithSnapshots = - TableMetadata.buildFrom(NO_SNAPSHOTS_METADATA) - .setBranchSnapshot(testSnapshots.get(testSnapshots.size() - 1), SnapshotRef.MAIN_BRANCH) - .build(); - Assertions.assertDoesNotThrow( - () -> - snapshotInspector.validateSnapshotsUpdate( - metadataWithSnapshots, testSnapshots, Collections.emptyList())); - // No validation error if snapshots are added and deleted - Assertions.assertDoesNotThrow( - () -> - snapshotInspector.validateSnapshotsUpdate( - metadataWithSnapshots, testSnapshots, testSnapshots)); - // No validation error if snapshots are added and deleted - Assertions.assertDoesNotThrow( - () -> - snapshotInspector.validateSnapshotsUpdate( - metadataWithSnapshots, extraTestSnapshots, testSnapshots)); - Assertions.assertThrows( - InvalidIcebergSnapshotException.class, - () -> - snapshotInspector.validateSnapshotsUpdate( - metadataWithSnapshots, Collections.emptyList(), testSnapshots)); - Assertions.assertDoesNotThrow( - () -> - snapshotInspector.validateSnapshotsUpdate( - metadataWithSnapshots, - Collections.emptyList(), - testSnapshots.subList(0, testSnapshots.size() - 1))); - } - - @Test - void testSecureSnapshot() throws IOException { - // The default file attribute that sets the permission as 777 when a file is created. - FileAttribute> attr = - PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxrwxrwx")); - - // Mock DataFile and ManifestFile - Snapshot mockSnapshot = Mockito.mock(org.apache.iceberg.Snapshot.class); - Path tempFile1 = Files.createFile(tempDir.resolve("data1.parquet"), attr); - Path tempFile2 = Files.createFile(tempDir.resolve("data2.parquet"), attr); - Path tempFile3 = Files.createFile(tempDir.resolve("manifest"), attr); - - // Mock FileIO - FileIO fileIO = Mockito.mock(org.apache.iceberg.io.FileIO.class); - - List dataFileList = - ImmutableList.of( - createDataFile(tempFile1.toString()), createDataFile(tempFile2.toString())); - - ManifestWriter manifestWriter = - ManifestFiles.write( - PartitionSpec.unpartitioned(), - HadoopOutputFile.fromLocation(tempFile3.toString(), new Configuration())); - manifestWriter.close(); - - Mockito.when(mockSnapshot.allManifests(fileIO)) - .thenReturn(ImmutableList.of(manifestWriter.toManifestFile())); - Mockito.when(mockSnapshot.addedDataFiles(fileIO)).thenReturn(dataFileList); - snapshotInspector.secureSnapshot(mockSnapshot, fileIO); - - /* Verify the perms of files are modified as com.linkedin.openhouse.internal.catalog.MockApplication.perm does */ - FileSystem fileSystem = FileSystem.get(new Configuration()); - Assertions.assertEquals( - fileSystem - .getFileStatus(new org.apache.hadoop.fs.Path(tempFile1.toString())) - .getPermission(), - MockApplication.FS_PERMISSION); - Assertions.assertEquals( - fileSystem - .getFileStatus(new org.apache.hadoop.fs.Path(tempFile2.toString())) - .getPermission(), - MockApplication.FS_PERMISSION); - Assertions.assertEquals( - fileSystem - .getFileStatus(new org.apache.hadoop.fs.Path(tempFile3.toString())) - .getPermission(), - MockApplication.FS_PERMISSION); - } - - public static DataFile createDataFile(String dataPath) throws IOException { - Files.write(Paths.get(dataPath), Lists.newArrayList(), StandardCharsets.UTF_8); - return DataFiles.builder(PartitionSpec.unpartitioned()) - .withPath(dataPath) - .withFileSizeInBytes(10) - .withRecordCount(1) - .build(); - } -} diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTestWithSettableComponents.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTestWithSettableComponents.java index 85044da5e..ceb3f9e26 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTestWithSettableComponents.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTestWithSettableComponents.java @@ -8,7 +8,7 @@ import com.linkedin.openhouse.cluster.storage.StorageManager; import com.linkedin.openhouse.common.test.cluster.PropertyOverrideContextInitializer; import com.linkedin.openhouse.internal.catalog.OpenHouseInternalTableOperations; -import com.linkedin.openhouse.internal.catalog.SnapshotInspector; +import com.linkedin.openhouse.internal.catalog.SnapshotDiffApplier; import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager; import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper; import com.linkedin.openhouse.internal.catalog.model.HouseTable; @@ -60,12 +60,12 @@ public class RepositoryTestWithSettableComponents { @Autowired FileIOManager fileIOManager; - @Autowired SnapshotInspector snapshotInspector; - @Autowired HouseTableMapper houseTableMapper; @Autowired MeterRegistry meterRegistry; + @Autowired SnapshotDiffApplier snapshotDiffApplier; + FileIO fileIO; @PostConstruct @@ -97,15 +97,17 @@ void testNoRetryInternalRepo() { // construct a real table object to prepare subsequent client call for table-update (that they // will fail) + MetricsReporter metricsReporter = + new MetricsReporter(this.meterRegistry, "test", Lists.newArrayList()); OpenHouseInternalTableOperations actualOps = new OpenHouseInternalTableOperations( houseTablesRepository, fileIO, - snapshotInspector, houseTableMapper, tableIdentifier, - new MetricsReporter(this.meterRegistry, "test", Lists.newArrayList()), - fileIOManager); + metricsReporter, + fileIOManager, + snapshotDiffApplier); ((SettableCatalogForTest) catalog).setOperation(actualOps); TableDto creationDTO = TABLE_DTO.toBuilder().tableVersion(INITIAL_TABLE_VERSION).build(); creationDTO = openHouseInternalRepository.save(creationDTO); @@ -114,15 +116,17 @@ void testNoRetryInternalRepo() { // injecting mocked htsRepo within a tableOperation that fails doCommit method. // The requirement to trigger htsRepo.save call are: Detectable updates in Transaction itself. + MetricsReporter metricsReporter2 = + new MetricsReporter(this.meterRegistry, "test", Lists.newArrayList()); OpenHouseInternalTableOperations mockOps = new OpenHouseInternalTableOperations( htsRepo, fileIO, - snapshotInspector, houseTableMapper, tableIdentifier, - new MetricsReporter(this.meterRegistry, "test", Lists.newArrayList()), - fileIOManager); + metricsReporter2, + fileIOManager, + snapshotDiffApplier); OpenHouseInternalTableOperations spyOperations = Mockito.spy(mockOps); doReturn(actualOps.current()).when(spyOperations).refresh(); BaseTable spyOptsMockedTable = Mockito.spy(new BaseTable(spyOperations, realTable.name())); @@ -195,15 +199,17 @@ void testFailedHtsRepoWhenGet() { for (Class c : exs) { HouseTableRepository htsRepo = provideFailedHtsRepoWhenGet(c); + MetricsReporter metricsReporter = + new MetricsReporter(this.meterRegistry, "test", Lists.newArrayList()); OpenHouseInternalTableOperations mockOps = new OpenHouseInternalTableOperations( htsRepo, fileIO, - snapshotInspector, houseTableMapper, tableIdentifier, - new MetricsReporter(this.meterRegistry, "test", Lists.newArrayList()), - fileIOManager); + metricsReporter, + fileIOManager, + snapshotDiffApplier); OpenHouseInternalTableOperations spyOperations = Mockito.spy(mockOps); BaseTable spyOptsMockedTable = Mockito.spy( diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/SpringH2Application.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/SpringH2Application.java index d845e1b39..7cf0528ec 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/SpringH2Application.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/SpringH2Application.java @@ -5,9 +5,6 @@ import com.linkedin.openhouse.common.audit.model.ServiceAuditEvent; import com.linkedin.openhouse.tables.audit.DummyTableAuditHandler; import com.linkedin.openhouse.tables.audit.model.TableAuditEvent; -import java.util.function.Consumer; -import java.util.function.Supplier; -import org.apache.hadoop.fs.Path; import org.mockito.Mockito; import org.springframework.boot.SpringApplication; import org.springframework.boot.actuate.autoconfigure.security.servlet.ManagementWebSecurityAutoConfiguration; @@ -17,7 +14,6 @@ import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Primary; @SpringBootApplication @ComponentScan( @@ -53,19 +49,6 @@ public static void main(String[] args) { SpringApplication.run(SpringH2Application.class, args); } - /** - * File secure used for testing purpose. We cannot directly use the actual - * SnapshotInspector#fileSecurer as that changes file to a user group that is not guaranteed to - * exist across different platforms thus creating environment dependencies for unit tests. - */ - @Bean - @Primary - Consumer> provideTestFileSecurer() { - return pathSupplier -> { - // This is a no-op Consumer. It does nothing with the supplied Path. - }; - } - @Bean public AuditHandler serviceAuditHandler() { return Mockito.mock(DummyServiceAuditHandler.class); diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/settable/SettableTestConfig.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/settable/SettableTestConfig.java index 400b92b0f..f7d4f0124 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/settable/SettableTestConfig.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/settable/SettableTestConfig.java @@ -1,8 +1,12 @@ package com.linkedin.openhouse.tables.settable; +import com.linkedin.openhouse.cluster.metrics.micrometer.MetricsReporter; +import com.linkedin.openhouse.internal.catalog.SnapshotDiffApplier; import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository; import com.linkedin.openhouse.tables.repository.impl.SettableInternalRepositoryForTest; +import io.micrometer.core.instrument.MeterRegistry; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; @@ -20,4 +24,11 @@ public Catalog provideTestCatalog() { public OpenHouseInternalRepository provideTestInternalRepo() { return new SettableInternalRepositoryForTest(); } + + @Bean + public SnapshotDiffApplier snapshotDiffApplier(MeterRegistry meterRegistry) { + MetricsReporter metricsReporter = + new MetricsReporter(meterRegistry, "test", Lists.newArrayList()); + return new SnapshotDiffApplier(metricsReporter); + } } diff --git a/tables-test-fixtures/tables-test-fixtures-iceberg-1.2/src/main/java/com/linkedin/openhouse/tablestest/SpringH2TestApplication.java b/tables-test-fixtures/tables-test-fixtures-iceberg-1.2/src/main/java/com/linkedin/openhouse/tablestest/SpringH2TestApplication.java index 343c38d0d..0d85a24b0 100644 --- a/tables-test-fixtures/tables-test-fixtures-iceberg-1.2/src/main/java/com/linkedin/openhouse/tablestest/SpringH2TestApplication.java +++ b/tables-test-fixtures/tables-test-fixtures-iceberg-1.2/src/main/java/com/linkedin/openhouse/tablestest/SpringH2TestApplication.java @@ -1,17 +1,12 @@ package com.linkedin.openhouse.tablestest; -import java.util.function.Consumer; -import java.util.function.Supplier; -import org.apache.hadoop.fs.Path; import org.springframework.boot.SpringApplication; import org.springframework.boot.actuate.autoconfigure.security.servlet.ManagementWebSecurityAutoConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Primary; @SpringBootApplication @ComponentScan( @@ -47,17 +42,4 @@ public class SpringH2TestApplication { public static void main(String[] args) { SpringApplication.run(SpringH2TestApplication.class, args); } - - /** - * File secure used for testing purpose. We cannot directly use the actual - * SnapshotInspector#fileSecurer as that changes file to a user group that is not guaranteed to - * exist across different platforms thus creating environment dependencies for unit tests. - */ - @Bean - @Primary - Consumer> provideTestFileSecurer() { - return pathSupplier -> { - // This is a no-op Consumer. It does nothing with the supplied Path. - }; - } }