Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,24 @@ public class OpenHouseInternalCatalog extends BaseMetastoreCatalog {

@Autowired StorageType storageType;

@Autowired SnapshotInspector snapshotInspector;

@Autowired HouseTableMapper houseTableMapper;

@Autowired MeterRegistry meterRegistry;

@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
FileIO fileIO = resolveFileIO(tableIdentifier);
MetricsReporter metricsReporter =
new MetricsReporter(this.meterRegistry, METRICS_PREFIX, Lists.newArrayList());
SnapshotDiffApplier snapshotDiffApplier = new SnapshotDiffApplier(metricsReporter);
return new OpenHouseInternalTableOperations(
houseTableRepository,
resolveFileIO(tableIdentifier),
snapshotInspector,
fileIO,
houseTableMapper,
tableIdentifier,
new MetricsReporter(this.meterRegistry, METRICS_PREFIX, Lists.newArrayList()),
fileIOManager);
metricsReporter,
fileIOManager,
snapshotDiffApplier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,19 @@
import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
Expand All @@ -60,7 +51,6 @@
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.springframework.data.util.Pair;

@AllArgsConstructor
@Slf4j
Expand All @@ -70,8 +60,6 @@ public class OpenHouseInternalTableOperations extends BaseMetastoreTableOperatio

FileIO fileIO;

SnapshotInspector snapshotInspector;

HouseTableMapper houseTableMapper;

TableIdentifier tableIdentifier;
Expand All @@ -80,6 +68,8 @@ public class OpenHouseInternalTableOperations extends BaseMetastoreTableOperatio

FileIOManager fileIOManager;

SnapshotDiffApplier snapshotDiffApplier;

private static final Gson GSON = new Gson();

private static final Cache<String, Integer> CACHE =
Expand Down Expand Up @@ -229,6 +219,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
metadata = rebuildTblMetaWithSchema(metadata, CatalogConstants.EVOLVED_SCHEMA_KEY, true);
}

metadata = snapshotDiffApplier.applySnapshots(base, metadata);

int version = currentVersion() + 1;
CommitStatus commitStatus = CommitStatus.FAILURE;

Expand Down Expand Up @@ -260,8 +252,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
if (properties.containsKey(CatalogConstants.EVOLVED_SCHEMA_KEY)) {
properties.remove(CatalogConstants.EVOLVED_SCHEMA_KEY);
}
String serializedSnapshotsToPut = properties.remove(CatalogConstants.SNAPSHOTS_JSON_KEY);
String serializedSnapshotRefs = properties.remove(CatalogConstants.SNAPSHOTS_REFS_KEY);
boolean isStageCreate =
Boolean.parseBoolean(properties.remove(CatalogConstants.IS_STAGE_CREATE_KEY));
String sortOrderJson = properties.remove(CatalogConstants.SORT_ORDER_KEY);
Expand All @@ -274,24 +264,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
updatedMetadata = updatedMetadata.replaceSortOrder(sortOrder);
}

if (serializedSnapshotsToPut != null) {
List<Snapshot> snapshotsToPut =
SnapshotsUtil.parseSnapshots(fileIO, serializedSnapshotsToPut);
Pair<List<Snapshot>, List<Snapshot>> snapshotsDiff =
SnapshotsUtil.symmetricDifferenceSplit(snapshotsToPut, updatedMetadata.snapshots());
List<Snapshot> appendedSnapshots = snapshotsDiff.getFirst();
List<Snapshot> deletedSnapshots = snapshotsDiff.getSecond();
snapshotInspector.validateSnapshotsUpdate(
updatedMetadata, appendedSnapshots, deletedSnapshots);
Map<String, SnapshotRef> snapshotRefs =
serializedSnapshotRefs == null
? new HashMap<>()
: SnapshotsUtil.parseSnapshotRefs(serializedSnapshotRefs);
updatedMetadata =
maybeAppendSnapshots(updatedMetadata, appendedSnapshots, snapshotRefs, true);
updatedMetadata = maybeDeleteSnapshots(updatedMetadata, deletedSnapshots);
}

final TableMetadata updatedMtDataRef = updatedMetadata;
long metadataUpdateStartTime = System.currentTimeMillis();
try {
Expand Down Expand Up @@ -531,125 +503,6 @@ private void failIfRetryUpdate(Map<String, String> properties) {
}
}

public TableMetadata maybeDeleteSnapshots(
TableMetadata metadata, List<Snapshot> snapshotsToDelete) {
TableMetadata result = metadata;
if (CollectionUtils.isNotEmpty(snapshotsToDelete)) {
Set<Long> snapshotIds =
snapshotsToDelete.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
Map<String, String> updatedProperties = new HashMap<>(result.properties());
updatedProperties.put(
getCanonicalFieldName(CatalogConstants.DELETED_SNAPSHOTS),
snapshotsToDelete.stream()
.map(s -> Long.toString(s.snapshotId()))
.collect(Collectors.joining(",")));
result =
TableMetadata.buildFrom(result)
.setProperties(updatedProperties)
.build()
.removeSnapshotsIf(s -> snapshotIds.contains(s.snapshotId()));
metricsReporter.count(
InternalCatalogMetricsConstant.SNAPSHOTS_DELETED_CTR, snapshotsToDelete.size());
}
return result;
}

public TableMetadata maybeAppendSnapshots(
TableMetadata metadata,
List<Snapshot> snapshotsToAppend,
Map<String, SnapshotRef> snapshotRefs,
boolean recordAction) {
TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(metadata);
List<String> appendedSnapshots = new ArrayList<>();
List<String> stagedSnapshots = new ArrayList<>();
List<String> cherryPickedSnapshots = new ArrayList<>();
// Throw an exception if client sent request that included non-main branches in the
// snapshotRefs.
for (Map.Entry<String, SnapshotRef> entry : snapshotRefs.entrySet()) {
if (!entry.getKey().equals(SnapshotRef.MAIN_BRANCH)) {
throw new UnsupportedOperationException("OpenHouse supports only MAIN branch");
}
}
/**
* First check if there are new snapshots to be appended to current TableMetadata. If yes,
* following are the cases to be handled:
*
* <p>[1] A regular (non-wap) snapshot is being added to the MAIN branch.
*
* <p>[2] A staged (wap) snapshot is being created on top of current snapshot as its base.
* Recognized by STAGED_WAP_ID_PROP.
*
* <p>[3] A staged (wap) snapshot is being cherry picked to the MAIN branch wherein current
* snapshot in the MAIN branch is not the same as the base snapshot the staged (wap) snapshot
* was created on. Recognized by SOURCE_SNAPSHOT_ID_PROP. This case is called non-fast forward
* cherry pick.
*
* <p>In case no new snapshots are to be appended to current TableMetadata, there could be a
* cherrypick of a staged (wap) snapshot on top of the current snapshot in the MAIN branch which
* is the same as the base snapshot the staged (wap) snapshot was created on. This case is
* called fast forward cherry pick.
*/
if (CollectionUtils.isNotEmpty(snapshotsToAppend)) {
for (Snapshot snapshot : snapshotsToAppend) {
snapshotInspector.validateSnapshot(snapshot);
if (snapshot.summary().containsKey(SnapshotSummary.STAGED_WAP_ID_PROP)) {
// a stage only snapshot using wap.id
metadataBuilder.addSnapshot(snapshot);
stagedSnapshots.add(String.valueOf(snapshot.snapshotId()));
} else if (snapshot.summary().containsKey(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP)) {
// a snapshot created on a non fast-forward cherry-pick snapshot
metadataBuilder.setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH);
appendedSnapshots.add(String.valueOf(snapshot.snapshotId()));
cherryPickedSnapshots.add(
String.valueOf(snapshot.summary().get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP)));
} else {
// a regular snapshot
metadataBuilder.setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH);
appendedSnapshots.add(String.valueOf(snapshot.snapshotId()));
}
}
} else if (MapUtils.isNotEmpty(snapshotRefs)) {
// Updated ref in the main branch with no new snapshot means this is a
// fast-forward cherry-pick or rollback operation.
long newSnapshotId = snapshotRefs.get(SnapshotRef.MAIN_BRANCH).snapshotId();
// Either the current snapshot is null or the current snapshot is not equal
// to the new snapshot indicates an update. The first case happens when the
// stage/wap snapshot being cherry-picked is the first snapshot.
if (MapUtils.isEmpty(metadata.refs())
|| metadata.refs().get(SnapshotRef.MAIN_BRANCH).snapshotId() != newSnapshotId) {
metadataBuilder.setBranchSnapshot(newSnapshotId, SnapshotRef.MAIN_BRANCH);
cherryPickedSnapshots.add(String.valueOf(newSnapshotId));
}
}
if (recordAction) {
Map<String, String> updatedProperties = new HashMap<>(metadata.properties());
if (CollectionUtils.isNotEmpty(appendedSnapshots)) {
updatedProperties.put(
getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS),
appendedSnapshots.stream().collect(Collectors.joining(",")));
metricsReporter.count(
InternalCatalogMetricsConstant.SNAPSHOTS_ADDED_CTR, appendedSnapshots.size());
}
if (CollectionUtils.isNotEmpty(stagedSnapshots)) {
updatedProperties.put(
getCanonicalFieldName(CatalogConstants.STAGED_SNAPSHOTS),
stagedSnapshots.stream().collect(Collectors.joining(",")));
metricsReporter.count(
InternalCatalogMetricsConstant.SNAPSHOTS_STAGED_CTR, stagedSnapshots.size());
}
if (CollectionUtils.isNotEmpty(cherryPickedSnapshots)) {
updatedProperties.put(
getCanonicalFieldName(CatalogConstants.CHERRY_PICKED_SNAPSHOTS),
cherryPickedSnapshots.stream().collect(Collectors.joining(",")));
metricsReporter.count(
InternalCatalogMetricsConstant.SNAPSHOTS_CHERRY_PICKED_CTR,
cherryPickedSnapshots.size());
}
metadataBuilder.setProperties(updatedProperties);
}
return metadataBuilder.build();
}

/** Helper function to dump contents for map in debugging mode. */
private void logPropertiesMap(Map<String, String> map) {
log.debug(" === Printing the table properties within doCommit method === ");
Expand Down
Loading
Loading