Skip to content
Open
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
4b60c70
introducing branching
cbb330 Sep 29, 2025
8546d43
wap branch green tests
cbb330 Sep 30, 2025
e206888
accidentally commented line
cbb330 Sep 30, 2025
ef1e5b4
remove test with old behavior
cbb330 Oct 1, 2025
d0de1da
fixing multi-branch commits and ambiguous references
cbb330 Oct 2, 2025
554a3c3
refactoring for readability
cbb330 Oct 2, 2025
ea5ff0e
fixed edge case
cbb330 Oct 2, 2025
4fc3792
refactoring to make more simple
cbb330 Oct 2, 2025
9d6aec0
removing unused function
cbb330 Oct 2, 2025
bf5a474
workign tests for ambiguous commits
cbb330 Oct 7, 2025
4d9dae0
tests for the replication use case
cbb330 Oct 7, 2025
abdf335
refactoring pipeline
cbb330 Oct 8, 2025
4087462
working tests and restructured code
cbb330 Oct 9, 2025
a101d72
adding comments
cbb330 Oct 9, 2025
11be438
working tests
cbb330 Oct 9, 2025
c7426b4
complete refactor + new tests
cbb330 Oct 10, 2025
afe2627
fixing broken tests
cbb330 Oct 10, 2025
6ba98f5
centralizing maps/lists in constructor and reusing in applyTo
cbb330 Oct 21, 2025
39b6cf1
responding to comments
cbb330 Oct 23, 2025
fb0ff1b
formatting
cbb330 Nov 4, 2025
9b5a3d0
fixing small things
cbb330 Nov 4, 2025
15e1337
removing props
cbb330 Nov 4, 2025
75a1e2a
changing update properties
cbb330 Nov 4, 2025
c65dd95
fixing tests
cbb330 Nov 4, 2025
5d3d03f
fixing
cbb330 Nov 4, 2025
088de1c
fixing tests
cbb330 Nov 4, 2025
3d1a758
small refactor
cbb330 Nov 4, 2025
65666d2
updating containers
cbb330 Nov 5, 2025
062f386
updating containers
cbb330 Nov 5, 2025
85d8696
fixing tests
cbb330 Nov 5, 2025
d64f57b
cleaning up practices
cbb330 Nov 5, 2025
37af32c
small cleanup
cbb330 Nov 5, 2025
71bebbe
responding to comments
cbb330 Nov 8, 2025
323aa5a
adding more tests, and fixing small bug
cbb330 Nov 9, 2025
b51d5fb
responding to comments, adding test
cbb330 Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,24 @@ public class OpenHouseInternalCatalog extends BaseMetastoreCatalog {

@Autowired StorageType storageType;

@Autowired SnapshotInspector snapshotInspector;

@Autowired HouseTableMapper houseTableMapper;

@Autowired MeterRegistry meterRegistry;

@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
FileIO fileIO = resolveFileIO(tableIdentifier);
MetricsReporter metricsReporter =
new MetricsReporter(this.meterRegistry, METRICS_PREFIX, Lists.newArrayList());
SnapshotDiffApplier snapshotDiffApplier = new SnapshotDiffApplier(metricsReporter);
return new OpenHouseInternalTableOperations(
houseTableRepository,
resolveFileIO(tableIdentifier),
snapshotInspector,
fileIO,
houseTableMapper,
tableIdentifier,
new MetricsReporter(this.meterRegistry, METRICS_PREFIX, Lists.newArrayList()),
fileIOManager);
metricsReporter,
fileIOManager,
snapshotDiffApplier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,19 @@
import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
Expand All @@ -60,7 +51,6 @@
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.springframework.data.util.Pair;

@AllArgsConstructor
@Slf4j
Expand All @@ -70,8 +60,6 @@ public class OpenHouseInternalTableOperations extends BaseMetastoreTableOperatio

FileIO fileIO;

SnapshotInspector snapshotInspector;

HouseTableMapper houseTableMapper;

TableIdentifier tableIdentifier;
Expand All @@ -80,6 +68,8 @@ public class OpenHouseInternalTableOperations extends BaseMetastoreTableOperatio

FileIOManager fileIOManager;

SnapshotDiffApplier snapshotDiffApplier;

private static final Gson GSON = new Gson();

private static final Cache<String, Integer> CACHE =
Expand Down Expand Up @@ -229,6 +219,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
metadata = rebuildTblMetaWithSchema(metadata, CatalogConstants.EVOLVED_SCHEMA_KEY, true);
}

metadata = snapshotDiffApplier.applySnapshots(base, metadata);

int version = currentVersion() + 1;
CommitStatus commitStatus = CommitStatus.FAILURE;

Expand Down Expand Up @@ -260,8 +252,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
if (properties.containsKey(CatalogConstants.EVOLVED_SCHEMA_KEY)) {
properties.remove(CatalogConstants.EVOLVED_SCHEMA_KEY);
}
String serializedSnapshotsToPut = properties.remove(CatalogConstants.SNAPSHOTS_JSON_KEY);
String serializedSnapshotRefs = properties.remove(CatalogConstants.SNAPSHOTS_REFS_KEY);
boolean isStageCreate =
Boolean.parseBoolean(properties.remove(CatalogConstants.IS_STAGE_CREATE_KEY));
String sortOrderJson = properties.remove(CatalogConstants.SORT_ORDER_KEY);
Expand All @@ -274,24 +264,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
updatedMetadata = updatedMetadata.replaceSortOrder(sortOrder);
}

if (serializedSnapshotsToPut != null) {
List<Snapshot> snapshotsToPut =
SnapshotsUtil.parseSnapshots(fileIO, serializedSnapshotsToPut);
Pair<List<Snapshot>, List<Snapshot>> snapshotsDiff =
SnapshotsUtil.symmetricDifferenceSplit(snapshotsToPut, updatedMetadata.snapshots());
List<Snapshot> appendedSnapshots = snapshotsDiff.getFirst();
List<Snapshot> deletedSnapshots = snapshotsDiff.getSecond();
snapshotInspector.validateSnapshotsUpdate(
updatedMetadata, appendedSnapshots, deletedSnapshots);
Map<String, SnapshotRef> snapshotRefs =
serializedSnapshotRefs == null
? new HashMap<>()
: SnapshotsUtil.parseSnapshotRefs(serializedSnapshotRefs);
updatedMetadata =
maybeAppendSnapshots(updatedMetadata, appendedSnapshots, snapshotRefs, true);
updatedMetadata = maybeDeleteSnapshots(updatedMetadata, deletedSnapshots);
}

final TableMetadata updatedMtDataRef = updatedMetadata;
long metadataUpdateStartTime = System.currentTimeMillis();
try {
Expand Down Expand Up @@ -531,125 +503,6 @@ private void failIfRetryUpdate(Map<String, String> properties) {
}
}

public TableMetadata maybeDeleteSnapshots(
TableMetadata metadata, List<Snapshot> snapshotsToDelete) {
TableMetadata result = metadata;
if (CollectionUtils.isNotEmpty(snapshotsToDelete)) {
Set<Long> snapshotIds =
snapshotsToDelete.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
Map<String, String> updatedProperties = new HashMap<>(result.properties());
updatedProperties.put(
getCanonicalFieldName(CatalogConstants.DELETED_SNAPSHOTS),
snapshotsToDelete.stream()
.map(s -> Long.toString(s.snapshotId()))
.collect(Collectors.joining(",")));
result =
TableMetadata.buildFrom(result)
.setProperties(updatedProperties)
.build()
.removeSnapshotsIf(s -> snapshotIds.contains(s.snapshotId()));
metricsReporter.count(
InternalCatalogMetricsConstant.SNAPSHOTS_DELETED_CTR, snapshotsToDelete.size());
}
return result;
}

public TableMetadata maybeAppendSnapshots(
TableMetadata metadata,
List<Snapshot> snapshotsToAppend,
Map<String, SnapshotRef> snapshotRefs,
boolean recordAction) {
TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(metadata);
List<String> appendedSnapshots = new ArrayList<>();
List<String> stagedSnapshots = new ArrayList<>();
List<String> cherryPickedSnapshots = new ArrayList<>();
// Throw an exception if client sent request that included non-main branches in the
// snapshotRefs.
for (Map.Entry<String, SnapshotRef> entry : snapshotRefs.entrySet()) {
if (!entry.getKey().equals(SnapshotRef.MAIN_BRANCH)) {
throw new UnsupportedOperationException("OpenHouse supports only MAIN branch");
}
}
/**
* First check if there are new snapshots to be appended to current TableMetadata. If yes,
* following are the cases to be handled:
*
* <p>[1] A regular (non-wap) snapshot is being added to the MAIN branch.
*
* <p>[2] A staged (wap) snapshot is being created on top of current snapshot as its base.
* Recognized by STAGED_WAP_ID_PROP.
*
* <p>[3] A staged (wap) snapshot is being cherry picked to the MAIN branch wherein current
* snapshot in the MAIN branch is not the same as the base snapshot the staged (wap) snapshot
* was created on. Recognized by SOURCE_SNAPSHOT_ID_PROP. This case is called non-fast forward
* cherry pick.
*
* <p>In case no new snapshots are to be appended to current TableMetadata, there could be a
* cherrypick of a staged (wap) snapshot on top of the current snapshot in the MAIN branch which
* is the same as the base snapshot the staged (wap) snapshot was created on. This case is
* called fast forward cherry pick.
*/
if (CollectionUtils.isNotEmpty(snapshotsToAppend)) {
for (Snapshot snapshot : snapshotsToAppend) {
snapshotInspector.validateSnapshot(snapshot);
if (snapshot.summary().containsKey(SnapshotSummary.STAGED_WAP_ID_PROP)) {
// a stage only snapshot using wap.id
metadataBuilder.addSnapshot(snapshot);
stagedSnapshots.add(String.valueOf(snapshot.snapshotId()));
} else if (snapshot.summary().containsKey(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP)) {
// a snapshot created on a non fast-forward cherry-pick snapshot
metadataBuilder.setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH);
appendedSnapshots.add(String.valueOf(snapshot.snapshotId()));
cherryPickedSnapshots.add(
String.valueOf(snapshot.summary().get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP)));
} else {
// a regular snapshot
metadataBuilder.setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH);
appendedSnapshots.add(String.valueOf(snapshot.snapshotId()));
}
}
} else if (MapUtils.isNotEmpty(snapshotRefs)) {
// Updated ref in the main branch with no new snapshot means this is a
// fast-forward cherry-pick or rollback operation.
long newSnapshotId = snapshotRefs.get(SnapshotRef.MAIN_BRANCH).snapshotId();
// Either the current snapshot is null or the current snapshot is not equal
// to the new snapshot indicates an update. The first case happens when the
// stage/wap snapshot being cherry-picked is the first snapshot.
if (MapUtils.isEmpty(metadata.refs())
|| metadata.refs().get(SnapshotRef.MAIN_BRANCH).snapshotId() != newSnapshotId) {
metadataBuilder.setBranchSnapshot(newSnapshotId, SnapshotRef.MAIN_BRANCH);
cherryPickedSnapshots.add(String.valueOf(newSnapshotId));
}
}
if (recordAction) {
Map<String, String> updatedProperties = new HashMap<>(metadata.properties());
if (CollectionUtils.isNotEmpty(appendedSnapshots)) {
updatedProperties.put(
getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS),
appendedSnapshots.stream().collect(Collectors.joining(",")));
metricsReporter.count(
InternalCatalogMetricsConstant.SNAPSHOTS_ADDED_CTR, appendedSnapshots.size());
}
if (CollectionUtils.isNotEmpty(stagedSnapshots)) {
updatedProperties.put(
getCanonicalFieldName(CatalogConstants.STAGED_SNAPSHOTS),
stagedSnapshots.stream().collect(Collectors.joining(",")));
metricsReporter.count(
InternalCatalogMetricsConstant.SNAPSHOTS_STAGED_CTR, stagedSnapshots.size());
}
if (CollectionUtils.isNotEmpty(cherryPickedSnapshots)) {
updatedProperties.put(
getCanonicalFieldName(CatalogConstants.CHERRY_PICKED_SNAPSHOTS),
cherryPickedSnapshots.stream().collect(Collectors.joining(",")));
metricsReporter.count(
InternalCatalogMetricsConstant.SNAPSHOTS_CHERRY_PICKED_CTR,
cherryPickedSnapshots.size());
}
metadataBuilder.setProperties(updatedProperties);
}
return metadataBuilder.build();
}

/** Helper function to dump contents for map in debugging mode. */
private void logPropertiesMap(Map<String, String> map) {
log.debug(" === Printing the table properties within doCommit method === ");
Expand Down
Loading