Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.gson.Gson;
import com.linkedin.openhouse.common.metrics.OtelEmitter;
import com.linkedin.openhouse.common.stats.model.CommitEventTable;
import com.linkedin.openhouse.common.stats.model.CommitEventTablePartitions;
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
import com.linkedin.openhouse.jobs.util.SparkJobUtil;
import com.linkedin.openhouse.jobs.util.TableStatsCollector;
Expand Down Expand Up @@ -564,4 +565,29 @@ public List<CommitEventTable> collectCommitEventTable(String fqtn) {
return Collections.emptyList();
}
}

/**
* Collect partition-level commit events for a given fully-qualified table name.
*
* <p>Returns one record per (commit_id, partition) pair. Returns empty list for unpartitioned
* tables or errors.
*
* @param fqtn fully-qualified table name
* @return List of CommitEventTablePartitions objects (event_timestamp_ms will be set at publish
* time)
*/
public List<CommitEventTablePartitions> collectCommitEventTablePartitions(String fqtn) {
Table table = getTable(fqtn);

try {
TableStatsCollector tableStatsCollector = new TableStatsCollector(fs(), spark, table);
return tableStatsCollector.collectCommitEventTablePartitions();
} catch (IOException e) {
log.error("Unable to initialize file system for partition events collection", e);
return Collections.emptyList();
} catch (Exception e) {
log.error("Failed to collect partition events for table: {}", fqtn, e);
return Collections.emptyList();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.openhouse.common.metrics.DefaultOtelConfig;
import com.linkedin.openhouse.common.metrics.OtelEmitter;
import com.linkedin.openhouse.common.stats.model.CommitEventTable;
import com.linkedin.openhouse.common.stats.model.CommitEventTablePartitions;
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
import com.linkedin.openhouse.jobs.spark.state.StateManager;
import com.linkedin.openhouse.jobs.util.AppsOtelEmitter;
Expand Down Expand Up @@ -34,7 +35,7 @@ public TableStatsCollectionSparkApp(
protected void runInner(Operations ops) {
log.info("Running TableStatsCollectorApp for table {}", fqtn);

// Run stats collection and commit events collection in parallel
// Run stats collection, commit events collection, and partition events collection in parallel
long startTime = System.currentTimeMillis();

CompletableFuture<IcebergTableStats> statsFuture =
Expand All @@ -49,8 +50,14 @@ protected void runInner(Operations ops) {
() -> ops.collectCommitEventTable(fqtn),
result -> String.format("%s (%d events)", fqtn, result.size()));

// Wait for both to complete
CompletableFuture.allOf(statsFuture, commitEventsFuture).join();
CompletableFuture<List<CommitEventTablePartitions>> partitionEventsFuture =
executeWithTimingAsync(
"partition events collection",
() -> ops.collectCommitEventTablePartitions(fqtn),
result -> String.format("%s (%d partition events)", fqtn, result.size()));

// Wait for all three to complete
CompletableFuture.allOf(statsFuture, commitEventsFuture, partitionEventsFuture).join();

long endTime = System.currentTimeMillis();
log.info(
Expand All @@ -74,6 +81,16 @@ protected void runInner(Operations ops) {
"Skipping commit events publishing for table: {} due to collection failure or no events",
fqtn);
}

List<CommitEventTablePartitions> partitionEvents = partitionEventsFuture.join();
if (partitionEvents != null && !partitionEvents.isEmpty()) {
publishPartitionEvents(partitionEvents);
} else {
log.info(
"Skipping partition events publishing for table: {} "
+ "(unpartitioned table or collection failure or no events)",
fqtn);
}
}

/**
Expand All @@ -100,6 +117,20 @@ protected void publishCommitEvents(List<CommitEventTable> commitEvents) {
log.info(new Gson().toJson(commitEvents));
}

/**
* Publish partition-level commit events. Override this method in li-openhouse to send to Kafka.
*
* @param partitionEvents List of partition events to publish
*/
protected void publishPartitionEvents(List<CommitEventTablePartitions> partitionEvents) {
// Set event timestamp at publish time
long eventTimestampInEpochMs = System.currentTimeMillis();
partitionEvents.forEach(event -> event.setEventTimestampMs(eventTimestampInEpochMs));

log.info("Publishing partition events for table: {}", fqtn);
log.info(new Gson().toJson(partitionEvents));
}

public static void main(String[] args) {
OtelEmitter otelEmitter =
new AppsOtelEmitter(Arrays.asList(DefaultOtelConfig.getOpenTelemetry()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.openhouse.jobs.util;

import com.linkedin.openhouse.common.stats.model.CommitEventTable;
import com.linkedin.openhouse.common.stats.model.CommitEventTablePartitions;
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
import java.util.List;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -47,4 +48,25 @@ public IcebergTableStats collectTableStats() {
public List<CommitEventTable> collectCommitEventTable() {
return TableStatsCollectorUtil.populateCommitEventTable(table, spark);
}

/**
* Collect partition-level commit events for the table.
*
* <p>Returns one record per (commit_id, partition) pair. Returns empty list for unpartitioned
* tables.
*
* <p>Note: Returns List (loads into memory). Size is manageable due to:
*
* <ul>
* <li>Iceberg retention limits active snapshots (~1-10k per table)
* <li>Typical partitions per commit: 10-1000
* <li>Typical size: 100K rows × 200 bytes = 20MB
* </ul>
*
* @return List of CommitEventTablePartitions objects (event_timestamp_ms will be set at publish
* time)
*/
public List<CommitEventTablePartitions> collectCommitEventTablePartitions() {
return TableStatsCollectorUtil.populateCommitEventTablePartitions(table, spark);
}
}
Loading