Skip to content

Commit 61776c2

Browse files
srawat98-devsrawatteamurkosumedhsakdeo
authored
Implementation[openhouseTableCommitEvents]: Commit job for freshness in TableStatsCollectionSparkApp (#398)
## Summary <!--- HINT: Replace #nnn with corresponding Issue number, if you are fixing an existing issue --> I extended the existing TableStatsCollectionSparkApp to implement the logic for populating the openhouseTableCommitEvents table. This new table will serve as the single source of truth for commit-related metadata across all OpenHouse datasets, including: - Commit ID - Commit timestamp - Commit operation - Spark App ID - Spark App Name This enables a unified, consistent, and efficient way to access commit events for all OpenHouse tables. ## Output / Result 1. This PR populates the openhouseTableCommitEvents table by pushing commit events from Snapshot Metadata table for all OH datasets. 2. Creates one row per commit across all OpenHouse tables. 3. Table will be updated daily via the TableStatsCollection job. 4. At every scheduled run, we will be processing all the active commit events(non-expired) in the Snapshot Metadata table. 5. Every Partition will have Commit events for all the non-expired Snapshots at the time of Job run. 6. This will have a lot of duplicates across partitions, but we can handle it at query time in the downstream consumer. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done <!--- Check any relevant boxes with "x" --> - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request. --------- Co-authored-by: srawat <[email protected]> Co-authored-by: Stas Pak <[email protected]> Co-authored-by: Sumedh Sakdeo <[email protected]>
1 parent c03fae3 commit 61776c2

File tree

6 files changed

+844
-19
lines changed

6 files changed

+844
-19
lines changed

apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
import com.google.common.collect.Lists;
55
import com.google.gson.Gson;
66
import com.linkedin.openhouse.common.metrics.OtelEmitter;
7+
import com.linkedin.openhouse.common.stats.model.CommitEventTable;
78
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
89
import com.linkedin.openhouse.jobs.util.SparkJobUtil;
910
import com.linkedin.openhouse.jobs.util.TableStatsCollector;
1011
import java.io.IOException;
1112
import java.nio.file.Paths;
1213
import java.time.ZonedDateTime;
1314
import java.time.temporal.ChronoUnit;
15+
import java.util.Collections;
1416
import java.util.HashMap;
1517
import java.util.List;
1618
import java.util.Map;
@@ -530,15 +532,36 @@ static String partitionToString(StructLike partition) {
530532
public IcebergTableStats collectTableStats(String fqtn) {
531533
Table table = getTable(fqtn);
532534

533-
TableStatsCollector tableStatsCollector;
534535
try {
535-
tableStatsCollector = new TableStatsCollector(fs(), spark, fqtn, table);
536+
TableStatsCollector tableStatsCollector = new TableStatsCollector(fs(), spark, table);
537+
return tableStatsCollector.collectTableStats();
536538
} catch (IOException e) {
537539
log.error("Unable to initialize file system for table stats collection", e);
538540
return null;
541+
} catch (Exception e) {
542+
log.error("Failed to collect table stats for table: {}", fqtn, e);
543+
return null;
539544
}
545+
}
540546

541-
IcebergTableStats tableStats = tableStatsCollector.collectTableStats();
542-
return tableStats;
547+
/**
548+
* Collect commit events for a given fully-qualified table name.
549+
*
550+
* @param fqtn fully-qualified table name
551+
* @return List of CommitEventTable objects (event_timestamp_ms will be set at publish time)
552+
*/
553+
public List<CommitEventTable> collectCommitEventTable(String fqtn) {
554+
Table table = getTable(fqtn);
555+
556+
try {
557+
TableStatsCollector tableStatsCollector = new TableStatsCollector(fs(), spark, table);
558+
return tableStatsCollector.collectCommitEventTable();
559+
} catch (IOException e) {
560+
log.error("Unable to initialize file system for commit events collection", e);
561+
return Collections.emptyList();
562+
} catch (Exception e) {
563+
log.error("Failed to collect commit events for table: {}", fqtn, e);
564+
return Collections.emptyList();
565+
}
543566
}
544567
}

apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/TableStatsCollectionSparkApp.java

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
import com.google.gson.Gson;
44
import com.linkedin.openhouse.common.metrics.DefaultOtelConfig;
55
import com.linkedin.openhouse.common.metrics.OtelEmitter;
6+
import com.linkedin.openhouse.common.stats.model.CommitEventTable;
67
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
78
import com.linkedin.openhouse.jobs.spark.state.StateManager;
89
import com.linkedin.openhouse.jobs.util.AppsOtelEmitter;
910
import java.util.ArrayList;
1011
import java.util.Arrays;
1112
import java.util.List;
13+
import java.util.concurrent.CompletableFuture;
14+
import java.util.function.Supplier;
1215
import lombok.extern.slf4j.Slf4j;
1316
import org.apache.commons.cli.CommandLine;
1417
import org.apache.commons.cli.Option;
@@ -31,8 +34,46 @@ public TableStatsCollectionSparkApp(
3134
protected void runInner(Operations ops) {
3235
log.info("Running TableStatsCollectorApp for table {}", fqtn);
3336

34-
IcebergTableStats icebergTableStats = ops.collectTableStats(fqtn);
35-
publishStats(icebergTableStats);
37+
// Run stats collection and commit events collection in parallel
38+
long startTime = System.currentTimeMillis();
39+
40+
CompletableFuture<IcebergTableStats> statsFuture =
41+
executeWithTimingAsync(
42+
"table stats collection",
43+
() -> ops.collectTableStats(fqtn),
44+
result -> String.format("%s", fqtn));
45+
46+
CompletableFuture<List<CommitEventTable>> commitEventsFuture =
47+
executeWithTimingAsync(
48+
"commit events collection",
49+
() -> ops.collectCommitEventTable(fqtn),
50+
result -> String.format("%s (%d events)", fqtn, result.size()));
51+
52+
// Wait for both to complete
53+
CompletableFuture.allOf(statsFuture, commitEventsFuture).join();
54+
55+
long endTime = System.currentTimeMillis();
56+
log.info(
57+
"Total collection time for table: {} in {} ms (parallel execution)",
58+
fqtn,
59+
(endTime - startTime));
60+
61+
// Publish results
62+
IcebergTableStats icebergTableStats = statsFuture.join();
63+
if (icebergTableStats != null) {
64+
publishStats(icebergTableStats);
65+
} else {
66+
log.warn("Skipping stats publishing for table: {} due to collection failure", fqtn);
67+
}
68+
69+
List<CommitEventTable> commitEvents = commitEventsFuture.join();
70+
if (commitEvents != null && !commitEvents.isEmpty()) {
71+
publishCommitEvents(commitEvents);
72+
} else {
73+
log.warn(
74+
"Skipping commit events publishing for table: {} due to collection failure or no events",
75+
fqtn);
76+
}
3677
}
3778

3879
/**
@@ -45,12 +86,57 @@ protected void publishStats(IcebergTableStats icebergTableStats) {
4586
log.info(new Gson().toJson(icebergTableStats));
4687
}
4788

89+
/**
90+
* Publish commit events. Override this method in li-openhouse to send to Kafka.
91+
*
92+
* @param commitEvents List of commit events to publish
93+
*/
94+
protected void publishCommitEvents(List<CommitEventTable> commitEvents) {
95+
// Set event timestamp at publish time
96+
long eventTimestampInEpochMs = System.currentTimeMillis();
97+
commitEvents.forEach(event -> event.setEventTimestampMs(eventTimestampInEpochMs));
98+
99+
log.info("Publishing commit events for table: {}", fqtn);
100+
log.info(new Gson().toJson(commitEvents));
101+
}
102+
48103
public static void main(String[] args) {
49104
OtelEmitter otelEmitter =
50105
new AppsOtelEmitter(Arrays.asList(DefaultOtelConfig.getOpenTelemetry()));
51106
createApp(args, otelEmitter).run();
52107
}
53108

109+
/**
110+
* Execute a supplier asynchronously with timing and logging.
111+
*
112+
* @param operationName Name of the operation for logging
113+
* @param supplier The operation to execute
114+
* @param resultFormatter Function to format the result for logging
115+
* @param <T> Return type of the operation
116+
* @return CompletableFuture wrapping the operation result
117+
*/
118+
private <T> CompletableFuture<T> executeWithTimingAsync(
119+
String operationName,
120+
Supplier<T> supplier,
121+
java.util.function.Function<T, String> resultFormatter) {
122+
return CompletableFuture.supplyAsync(
123+
() -> {
124+
long startTime = System.currentTimeMillis();
125+
log.info("Starting {} for table: {}", operationName, fqtn);
126+
T result = supplier.get();
127+
long endTime = System.currentTimeMillis();
128+
129+
String resultDescription =
130+
(result != null) ? resultFormatter.apply(result) : "null (collection failed)";
131+
log.info(
132+
"Completed {} for table: {} in {} ms",
133+
operationName,
134+
resultDescription,
135+
(endTime - startTime));
136+
return result;
137+
});
138+
}
139+
54140
public static TableStatsCollectionSparkApp createApp(String[] args, OtelEmitter otelEmitter) {
55141
List<Option> extraOptions = new ArrayList<>();
56142
extraOptions.add(new Option("t", "tableName", true, "Fully-qualified table name"));

apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollector.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.linkedin.openhouse.jobs.util;
22

3+
import com.linkedin.openhouse.common.stats.model.CommitEventTable;
34
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
5+
import java.util.List;
46
import lombok.AllArgsConstructor;
57
import lombok.extern.slf4j.Slf4j;
68
import org.apache.hadoop.fs.FileSystem;
@@ -14,7 +16,6 @@ public class TableStatsCollector {
1416

1517
private FileSystem fs;
1618
private SparkSession spark;
17-
String fqtn;
1819
Table table;
1920

2021
/** Collect table stats. */
@@ -25,14 +26,25 @@ public IcebergTableStats collectTableStats() {
2526
TableStatsCollectorUtil.populateTableMetadata(table, stats);
2627
IcebergTableStats statsWithReferenceFiles =
2728
TableStatsCollectorUtil.populateStatsOfAllReferencedFiles(
28-
fqtn, table, spark, statsWithMetadataData);
29+
table, spark, statsWithMetadataData);
2930
IcebergTableStats statsWithCurrentSnapshot =
30-
TableStatsCollectorUtil.populateStatsForSnapshots(
31-
fqtn, table, spark, statsWithReferenceFiles);
31+
TableStatsCollectorUtil.populateStatsForSnapshots(table, spark, statsWithReferenceFiles);
3232

3333
IcebergTableStats tableStats =
34-
TableStatsCollectorUtil.populateStorageStats(fqtn, table, fs, statsWithCurrentSnapshot);
34+
TableStatsCollectorUtil.populateStorageStats(table, fs, statsWithCurrentSnapshot);
3535

3636
return tableStats;
3737
}
38+
39+
/**
40+
* Collect commit events for the table.
41+
*
42+
* <p>Note: Returns List (loads into memory). This is acceptable because Iceberg retention limits
43+
* active snapshots to a manageable number (typically <10k per table).
44+
*
45+
* @return List of CommitEventTable objects (event_timestamp_ms will be set at publish time)
46+
*/
47+
public List<CommitEventTable> collectCommitEventTable() {
48+
return TableStatsCollectorUtil.populateCommitEventTable(table, spark);
49+
}
3850
}

0 commit comments

Comments
 (0)