Skip to content

Commit ca0936b

Browse files
authored
Handle replication process compatible table creation in tables-service (#364)
## Summary <!--- HINT: Replace #nnn with corresponding Issue number, if you are fixing an existing issue --> Issue: Create table process does not handle table properties tables which require replication compatiblity. Such tables require the `lastUpdatedMillis` timestamp to follow lineage lineage of Primary tables. Since primary tables are always created before Replicated table, `primaryTable.lastUpdatedMillis` > `replicaTable.lastUpdatedMillis`. Table commits on replica containing snapshots from Primary table fails as commit process requires snapshot.timestamp > table.lastUpdatedMillis. Fix: Allow table creation process to set the `lastUpdatedMillis` field in table metadata by passing a new field in table property via `properties.last-updates-ms`. This is only application for tables requiring replication (should have property.isReplicatedTable`) - [ ] Client-facing API Changes - [x] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done Docker testing: 1. Create Replica table with isReplicatedProperty = true - passed and last-updated-ms (lastUpdatedMillis) is reflected in table metadata 2. Create Primary table with isReplicatedProperty = true - passed and last-updated-ms (lastUpdatedMillis) is reflected in table metadata 3. Create Primary table with isReplicatedProperty = false - passed and last-updated-ms (lastUpdatedMillis) is set to currentTimestamp CRUD operation in tables succeed ``` scala> spark.sql("insert into db.active_replica6 values('a', 'b')").show() ++ || ++ ++ scala> spark.sql("select * from db.active_replica6").show() +---+----+ | id|name| +---+----+ | a| b| +---+----+ scala> spark.sql("show tblProperties db.active_replica6").show(100, false) +------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------+ |key |value | +------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------+ |openhouse.lastModifiedTime |1755215715485 | |openhouse.tableUUID |2a02f33a-ad9b-4677-8c57-8e3b8326567d | |openhouse.tableUri |LocalHadoopCluster.db.active_replica6 | |openhouse.creationTime |1755134384295 | |openhouse.tableId |active_replica6 | |write.metadata.delete-after-commit.enabled|true | |key |value | |openhouse.tableLocation |/data/openhouse/db/active_replica6-2a02f33a-ad9b-4677-8c57-8e3b8326567d/00001-d282fa1a-684f-4370-aaf1-6fc4c0911a6d.metadata.json| |write.metadata.previous-versions-max |28 | |openhouse.tableCreator |openhouse | |openhouse.tableType |PRIMARY_TABLE | |policies | | |openhouse.databaseId |db | |openhouse.isTableReplicated |true | |openhouse.clusterId |LocalHadoopCluster | |format |iceberg/orc | |openhouse.tableVersion |/data/openhouse/db/active_replica6-2a02f33a-ad9b-4677-8c57-8e3b8326567d/00000-bb85c6c7-b879-4d85-9cf1-71cee6e81223.metadata.json| |current-snapshot-id |1205631819648875310 | |last-updated-ms |1755134384295 | |write.format.default |orc | |write.parquet.compression-codec |zstd | |openhouse.appended_snapshots |1205631819648875310 | +------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------+ scala> spark.sql("drop table db.active_replica6").show() ++ || ++ ++ scala> spark.sql("drop table db.active_replica7").show() ++ || ++ ++ ``` <img width="1115" height="896" alt="Screenshot 2025-08-13 at 6 21 38 PM" src="https://github.com/user-attachments/assets/5cad1ebb-f139-4994-8e14-da4e0e26a2e0" /> <!--- Check any relevant boxes with "x" --> - [x] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request.
1 parent 40e69dd commit ca0936b

File tree

12 files changed

+464
-13
lines changed

12 files changed

+464
-13
lines changed

cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/hdfs/HdfsStorageClient.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
66
import java.io.IOException;
77
import javax.annotation.PostConstruct;
8+
import javax.annotation.PreDestroy;
89
import lombok.extern.slf4j.Slf4j;
910
import org.apache.hadoop.fs.FileSystem;
1011
import org.apache.hadoop.fs.Path;
@@ -68,4 +69,17 @@ public boolean exists(String path) {
6869
"Exception checking path existence " + e.getMessage(), e);
6970
}
7071
}
72+
73+
/** Clean up resources when the bean is destroyed. */
74+
@PreDestroy
75+
public void cleanup() {
76+
if (fs != null) {
77+
try {
78+
log.info("Closing FileSystem for HdfsStorageClient");
79+
fs.close();
80+
} catch (IOException e) {
81+
log.error("Error closing FileSystem during HdfsStorageClient cleanup", e);
82+
}
83+
}
84+
}
7185
}

cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/local/LocalStorageClient.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.net.URI;
1010
import java.net.URISyntaxException;
1111
import javax.annotation.PostConstruct;
12+
import javax.annotation.PreDestroy;
1213
import lombok.extern.slf4j.Slf4j;
1314
import org.apache.hadoop.fs.FileSystem;
1415
import org.apache.hadoop.fs.FilterFileSystem;
@@ -150,4 +151,17 @@ public boolean exists(String path) {
150151
"Exception checking path existence " + e.getMessage(), e);
151152
}
152153
}
154+
155+
/** Clean up resources when the bean is destroyed. */
156+
@PreDestroy
157+
public void cleanup() {
158+
if (fs != null) {
159+
try {
160+
log.info("Closing FileSystem for LocalStorageClient");
161+
fs.close();
162+
} catch (IOException e) {
163+
log.error("Error closing FileSystem during LocalStorageClient cleanup", e);
164+
}
165+
}
166+
}
153167
}

cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/s3/S3StorageClient.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.HashMap;
1010
import java.util.Map;
1111
import javax.annotation.PostConstruct;
12+
import javax.annotation.PreDestroy;
1213
import lombok.extern.slf4j.Slf4j;
1314
import org.apache.commons.lang3.StringUtils;
1415
import org.apache.iceberg.aws.AwsClientFactories;
@@ -99,4 +100,17 @@ public boolean exists(String path) {
99100
"Error checking S3 object existence: " + e.getMessage(), e);
100101
}
101102
}
103+
104+
/** Clean up resources when the bean is destroyed. */
105+
@PreDestroy
106+
public void cleanup() {
107+
if (s3 != null) {
108+
try {
109+
log.info("Closing S3Client for S3StorageClient");
110+
s3.close();
111+
} catch (Exception e) {
112+
log.error("Error closing S3Client during S3StorageClient cleanup", e);
113+
}
114+
}
115+
}
102116
}

iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/CatalogConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ public final class CatalogConstants {
66
public static final String SNAPSHOTS_REFS_KEY = "snapshotsRefs";
77
public static final String SORT_ORDER_KEY = "sortOrder";
88
public static final String IS_STAGE_CREATE_KEY = "isStageCreate";
9+
public static final String OPENHOUSE_TABLE_VERSION = "openhouse.tableVersion";
910
public static final String OPENHOUSE_UUID_KEY = "openhouse.tableUUID";
1011
public static final String OPENHOUSE_TABLEID_KEY = "openhouse.tableId";
1112
public static final String OPENHOUSE_DATABASEID_KEY = "openhouse.databaseId";
1213
public static final String OPENHOUSE_IS_TABLE_REPLICATED_KEY = "openhouse.isTableReplicated";
1314
public static final String OPENHOUSE_TABLEURI_KEY = "openhouse.tableUri";
1415
public static final String OPENHOUSE_CLUSTERID_KEY = "openhouse.clusterId";
1516
public static final String INITIAL_VERSION = "INITIAL_VERSION";
17+
public static final String LAST_UPDATED_MS = "last-updated-ms";
1618
public static final String APPENDED_SNAPSHOTS = "appended_snapshots";
1719
public static final String STAGED_SNAPSHOTS = "staged_snapshots";
1820
public static final String CHERRY_PICKED_SNAPSHOTS = "cherry_picked_snapshots";

iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalCatalog.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
7373
snapshotInspector,
7474
houseTableMapper,
7575
tableIdentifier,
76-
new MetricsReporter(this.meterRegistry, METRICS_PREFIX, Lists.newArrayList()));
76+
new MetricsReporter(this.meterRegistry, METRICS_PREFIX, Lists.newArrayList()),
77+
fileIOManager);
7778
}
7879

7980
@Override

iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,21 @@
66
import com.google.common.cache.CacheBuilder;
77
import com.google.gson.Gson;
88
import com.linkedin.openhouse.cluster.metrics.micrometer.MetricsReporter;
9+
import com.linkedin.openhouse.cluster.storage.Storage;
10+
import com.linkedin.openhouse.cluster.storage.StorageClient;
11+
import com.linkedin.openhouse.cluster.storage.hdfs.HdfsStorageClient;
12+
import com.linkedin.openhouse.cluster.storage.local.LocalStorageClient;
913
import com.linkedin.openhouse.internal.catalog.exception.InvalidIcebergSnapshotException;
14+
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
1015
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper;
1116
import com.linkedin.openhouse.internal.catalog.model.HouseTable;
1217
import com.linkedin.openhouse.internal.catalog.model.HouseTablePrimaryKey;
1318
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepository;
1419
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableCallerException;
1520
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableConcurrentUpdateException;
1621
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableNotFoundException;
22+
import com.linkedin.openhouse.internal.catalog.utils.MetadataUpdateUtils;
23+
import java.io.IOException;
1724
import java.time.Clock;
1825
import java.time.Instant;
1926
import java.util.ArrayList;
@@ -29,6 +36,7 @@
2936
import lombok.extern.slf4j.Slf4j;
3037
import org.apache.commons.collections.CollectionUtils;
3138
import org.apache.commons.collections.MapUtils;
39+
import org.apache.hadoop.fs.FileSystem;
3240
import org.apache.iceberg.BaseMetastoreTableOperations;
3341
import org.apache.iceberg.PartitionField;
3442
import org.apache.iceberg.PartitionSpec;
@@ -70,6 +78,8 @@ public class OpenHouseInternalTableOperations extends BaseMetastoreTableOperatio
7078

7179
MetricsReporter metricsReporter;
7280

81+
FileIOManager fileIOManager;
82+
7383
private static final Gson GSON = new Gson();
7484

7585
private static final Cache<String, Integer> CACHE =
@@ -222,17 +232,22 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
222232
Map<String, String> properties = new HashMap<>(metadata.properties());
223233
failIfRetryUpdate(properties);
224234

225-
String currentTsString = String.valueOf(Instant.now(Clock.systemUTC()).toEpochMilli());
226-
properties.put(getCanonicalFieldName("lastModifiedTime"), currentTsString);
227-
if (base == null) {
228-
properties.put(getCanonicalFieldName("creationTime"), currentTsString);
229-
}
230235
properties.put(
231236
getCanonicalFieldName("tableVersion"),
232237
properties.getOrDefault(
233238
getCanonicalFieldName("tableLocation"), CatalogConstants.INITIAL_VERSION));
234239
properties.put(getCanonicalFieldName("tableLocation"), newMetadataLocation);
235240

241+
String currentTsString = String.valueOf(Instant.now(Clock.systemUTC()).toEpochMilli());
242+
if (isReplicatedTableCreate(properties)) {
243+
currentTsString =
244+
metadata.properties().getOrDefault(CatalogConstants.LAST_UPDATED_MS, currentTsString);
245+
}
246+
properties.put(getCanonicalFieldName("lastModifiedTime"), currentTsString);
247+
if (base == null) {
248+
properties.put(getCanonicalFieldName("creationTime"), currentTsString);
249+
}
250+
236251
if (properties.containsKey(CatalogConstants.EVOLVED_SCHEMA_KEY)) {
237252
properties.remove(CatalogConstants.EVOLVED_SCHEMA_KEY);
238253
}
@@ -301,7 +316,24 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
301316
*/
302317
refreshFromMetadataLocation(newMetadataLocation);
303318
}
319+
if (isReplicatedTableCreate(properties)) {
320+
updateMetadataFieldForTable(metadata, newMetadataLocation);
321+
}
304322
commitStatus = CommitStatus.SUCCESS;
323+
} catch (IOException ioe) {
324+
commitStatus = checkCommitStatus(newMetadataLocation, metadata);
325+
// clean up the HTS entry
326+
try {
327+
houseTableRepository.delete(houseTable);
328+
} catch (HouseTableCallerException
329+
| HouseTableNotFoundException
330+
| HouseTableConcurrentUpdateException e) {
331+
log.warn(
332+
"Failed to delete house table during IOException cleanup for table: {}",
333+
tableIdentifier,
334+
e);
335+
}
336+
throw new CommitFailedException(ioe);
305337
} catch (InvalidIcebergSnapshotException e) {
306338
throw new BadRequestException(e, e.getMessage());
307339
} catch (CommitFailedException e) {
@@ -614,4 +646,44 @@ private String[] getCatalogMetricTags() {
614646
InternalCatalogMetricsConstant.DATABASE_TAG, tableIdentifier.namespace().toString()
615647
};
616648
}
649+
650+
/**
651+
* Updates metadata field for staged tables by extracting updateTimeStamp from metadata.properties
652+
* and updating the metadata file. Should be used only for replicated table.
653+
*
654+
* @param metadata The table metadata containing properties
655+
*/
656+
private void updateMetadataFieldForTable(TableMetadata metadata, String tableLocation)
657+
throws IOException {
658+
String updateTimeStamp = metadata.properties().get(CatalogConstants.LAST_UPDATED_MS);
659+
if (updateTimeStamp != null) {
660+
Storage storage = fileIOManager.getStorage(fileIO);
661+
// Support only for HDFS Storage and local storage clients
662+
if (storage != null
663+
&& (storage.getClient() instanceof HdfsStorageClient
664+
|| storage.getClient() instanceof LocalStorageClient)) {
665+
StorageClient<?> client = storage.getClient();
666+
FileSystem fs = (FileSystem) client.getNativeClient();
667+
if (tableLocation != null) {
668+
MetadataUpdateUtils.updateMetadataField(
669+
fs, tableLocation, CatalogConstants.LAST_UPDATED_MS, Long.valueOf(updateTimeStamp));
670+
}
671+
}
672+
}
673+
}
674+
675+
/**
676+
* Check if the properties have field values indicating a replicated table create request
677+
*
678+
* @param properties
679+
* @return
680+
*/
681+
private boolean isReplicatedTableCreate(Map<String, String> properties) {
682+
return Boolean.parseBoolean(
683+
properties.getOrDefault(CatalogConstants.OPENHOUSE_IS_TABLE_REPLICATED_KEY, "false"))
684+
&& properties
685+
.getOrDefault(
686+
CatalogConstants.OPENHOUSE_TABLE_VERSION, CatalogConstants.INITIAL_VERSION)
687+
.equals(CatalogConstants.INITIAL_VERSION);
688+
}
617689
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package com.linkedin.openhouse.internal.catalog.utils;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.databind.node.ObjectNode;
6+
import java.io.BufferedReader;
7+
import java.io.BufferedWriter;
8+
import java.io.IOException;
9+
import java.io.InputStream;
10+
import java.io.InputStreamReader;
11+
import java.io.OutputStream;
12+
import java.io.OutputStreamWriter;
13+
import java.nio.charset.StandardCharsets;
14+
import lombok.extern.slf4j.Slf4j;
15+
import org.apache.hadoop.fs.FileSystem;
16+
import org.apache.hadoop.fs.Path;
17+
import org.apache.hadoop.io.IOUtils;
18+
19+
/**
20+
* Utility class for updating metadata files in HDFS. Provides methods to read, modify, and write
21+
* JSON metadata files.
22+
*/
23+
@Slf4j
24+
public class MetadataUpdateUtils {
25+
26+
/**
27+
* Updates a specific field in a metadata file located at the given HDFS path.
28+
*
29+
* @param fs The Hadoop FileSystem instance
30+
* @param hdfsPath The path to the metadata file in HDFS
31+
* @param fieldName The name of the field to update
32+
* @param updatedValue The new value for the field
33+
* @throws IOException if there's an error reading or writing the file
34+
*/
35+
public static void updateMetadataField(
36+
FileSystem fs, String hdfsPath, String fieldName, Long updatedValue) throws IOException {
37+
try {
38+
InputStream inputStream = fs.open(new Path(hdfsPath));
39+
String jsonString = readInputStream(inputStream);
40+
IOUtils.closeStream(inputStream);
41+
42+
String updatedJsonString = updateJsonField(jsonString, fieldName, updatedValue);
43+
44+
OutputStream outputStream = fs.create(new Path(hdfsPath), true);
45+
writeOutputStream(outputStream, updatedJsonString);
46+
IOUtils.closeStream(outputStream);
47+
48+
log.info("Updated json metadata at path: {}", hdfsPath);
49+
} catch (IOException e) {
50+
String errMsg =
51+
String.format(
52+
"Failed to update metadata file at path: %s. Error: %s", hdfsPath, e.getMessage());
53+
log.error(errMsg);
54+
throw new IOException(errMsg);
55+
}
56+
}
57+
58+
/**
59+
* Reads the content of an InputStream and returns it as a String.
60+
*
61+
* @param inputStream The InputStream to read from
62+
* @return The content as a String
63+
* @throws IOException if there's an error reading the stream
64+
*/
65+
private static String readInputStream(InputStream inputStream) throws IOException {
66+
try (BufferedReader reader =
67+
new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
68+
StringBuilder content = new StringBuilder();
69+
String line;
70+
while ((line = reader.readLine()) != null) {
71+
content.append(line);
72+
}
73+
return content.toString();
74+
}
75+
}
76+
77+
/**
78+
* Writes content to an OutputStream.
79+
*
80+
* @param outputStream The OutputStream to write to
81+
* @param content The content to write
82+
* @throws IOException if there's an error writing to the stream
83+
*/
84+
private static void writeOutputStream(OutputStream outputStream, String content)
85+
throws IOException {
86+
try (BufferedWriter writer =
87+
new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8))) {
88+
writer.write(content);
89+
}
90+
}
91+
92+
/**
93+
* Updates a specific field in a JSON string with a new value.
94+
*
95+
* @param jsonString The JSON string to modify
96+
* @param fieldName The name of the field to update
97+
* @param updatedValue The new value for the field
98+
* @return The updated JSON string with pretty printing
99+
* @throws IOException if there's an error parsing or writing the JSON
100+
*/
101+
private static String updateJsonField(String jsonString, String fieldName, long updatedValue)
102+
throws IOException {
103+
ObjectMapper objectMapper = new ObjectMapper();
104+
JsonNode jsonNode = objectMapper.readTree(jsonString);
105+
((ObjectNode) jsonNode).put(fieldName, updatedValue);
106+
return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode);
107+
}
108+
}

0 commit comments

Comments
 (0)