diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java index 43b898c39..ff5cd3296 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java @@ -5,6 +5,7 @@ import com.google.gson.Gson; import com.linkedin.openhouse.common.metrics.OtelEmitter; import com.linkedin.openhouse.common.stats.model.CommitEventTable; +import com.linkedin.openhouse.common.stats.model.CommitEventTablePartitions; import com.linkedin.openhouse.common.stats.model.IcebergTableStats; import com.linkedin.openhouse.jobs.util.SparkJobUtil; import com.linkedin.openhouse.jobs.util.TableStatsCollector; @@ -564,4 +565,29 @@ public List collectCommitEventTable(String fqtn) { return Collections.emptyList(); } } + + /** + * Collect partition-level commit events for a given fully-qualified table name. + * + *

Returns one record per (commit_id, partition) pair. Returns empty list for unpartitioned + * tables or errors. + * + * @param fqtn fully-qualified table name + * @return List of CommitEventTablePartitions objects (event_timestamp_ms will be set at publish + * time) + */ + public List collectCommitEventTablePartitions(String fqtn) { + Table table = getTable(fqtn); + + try { + TableStatsCollector tableStatsCollector = new TableStatsCollector(fs(), spark, table); + return tableStatsCollector.collectCommitEventTablePartitions(); + } catch (IOException e) { + log.error("Unable to initialize file system for partition events collection", e); + return Collections.emptyList(); + } catch (Exception e) { + log.error("Failed to collect partition events for table: {}", fqtn, e); + return Collections.emptyList(); + } + } } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/TableStatsCollectionSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/TableStatsCollectionSparkApp.java index 9e6297f89..d30dbcc3b 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/TableStatsCollectionSparkApp.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/TableStatsCollectionSparkApp.java @@ -4,6 +4,7 @@ import com.linkedin.openhouse.common.metrics.DefaultOtelConfig; import com.linkedin.openhouse.common.metrics.OtelEmitter; import com.linkedin.openhouse.common.stats.model.CommitEventTable; +import com.linkedin.openhouse.common.stats.model.CommitEventTablePartitions; import com.linkedin.openhouse.common.stats.model.IcebergTableStats; import com.linkedin.openhouse.jobs.spark.state.StateManager; import com.linkedin.openhouse.jobs.util.AppsOtelEmitter; @@ -34,7 +35,7 @@ public TableStatsCollectionSparkApp( protected void runInner(Operations ops) { log.info("Running TableStatsCollectorApp for table {}", fqtn); - // Run stats collection and commit events collection in parallel + // Run stats collection, commit events collection, and partition events collection in parallel long startTime = System.currentTimeMillis(); CompletableFuture statsFuture = @@ -49,8 +50,14 @@ protected void runInner(Operations ops) { () -> ops.collectCommitEventTable(fqtn), result -> String.format("%s (%d events)", fqtn, result.size())); - // Wait for both to complete - CompletableFuture.allOf(statsFuture, commitEventsFuture).join(); + CompletableFuture> partitionEventsFuture = + executeWithTimingAsync( + "partition events collection", + () -> ops.collectCommitEventTablePartitions(fqtn), + result -> String.format("%s (%d partition events)", fqtn, result.size())); + + // Wait for all three to complete + CompletableFuture.allOf(statsFuture, commitEventsFuture, partitionEventsFuture).join(); long endTime = System.currentTimeMillis(); log.info( @@ -74,6 +81,16 @@ protected void runInner(Operations ops) { "Skipping commit events publishing for table: {} due to collection failure or no events", fqtn); } + + List partitionEvents = partitionEventsFuture.join(); + if (partitionEvents != null && !partitionEvents.isEmpty()) { + publishPartitionEvents(partitionEvents); + } else { + log.info( + "Skipping partition events publishing for table: {} " + + "(unpartitioned table or collection failure or no events)", + fqtn); + } } /** @@ -100,6 +117,20 @@ protected void publishCommitEvents(List commitEvents) { log.info(new Gson().toJson(commitEvents)); } + /** + * Publish partition-level commit events. Override this method in li-openhouse to send to Kafka. + * + * @param partitionEvents List of partition events to publish + */ + protected void publishPartitionEvents(List partitionEvents) { + // Set event timestamp at publish time + long eventTimestampInEpochMs = System.currentTimeMillis(); + partitionEvents.forEach(event -> event.setEventTimestampMs(eventTimestampInEpochMs)); + + log.info("Publishing partition events for table: {}", fqtn); + log.info(new Gson().toJson(partitionEvents)); + } + public static void main(String[] args) { OtelEmitter otelEmitter = new AppsOtelEmitter(Arrays.asList(DefaultOtelConfig.getOpenTelemetry())); diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollector.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollector.java index a0da894bd..ea82178ba 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollector.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollector.java @@ -1,6 +1,7 @@ package com.linkedin.openhouse.jobs.util; import com.linkedin.openhouse.common.stats.model.CommitEventTable; +import com.linkedin.openhouse.common.stats.model.CommitEventTablePartitions; import com.linkedin.openhouse.common.stats.model.IcebergTableStats; import java.util.List; import lombok.AllArgsConstructor; @@ -47,4 +48,25 @@ public IcebergTableStats collectTableStats() { public List collectCommitEventTable() { return TableStatsCollectorUtil.populateCommitEventTable(table, spark); } + + /** + * Collect partition-level commit events for the table. + * + *

Returns one record per (commit_id, partition) pair. Returns empty list for unpartitioned + * tables. + * + *

Note: Returns List (loads into memory). Size is manageable due to: + * + *

+ * + * @return List of CommitEventTablePartitions objects (event_timestamp_ms will be set at publish + * time) + */ + public List collectCommitEventTablePartitions() { + return TableStatsCollectorUtil.populateCommitEventTablePartitions(table, spark); + } } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtil.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtil.java index cd14d678f..75cf9b67f 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtil.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtil.java @@ -6,13 +6,19 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonObject; +import com.linkedin.openhouse.common.stats.model.BaseEventModels; +import com.linkedin.openhouse.common.stats.model.ColumnData; import com.linkedin.openhouse.common.stats.model.CommitEventTable; +import com.linkedin.openhouse.common.stats.model.CommitEventTablePartitions; +import com.linkedin.openhouse.common.stats.model.CommitMetadata; +import com.linkedin.openhouse.common.stats.model.CommitOperation; import com.linkedin.openhouse.common.stats.model.HistoryPolicyStatsSchema; import com.linkedin.openhouse.common.stats.model.IcebergTableStats; import com.linkedin.openhouse.common.stats.model.PolicyStats; import com.linkedin.openhouse.common.stats.model.RetentionStatsSchema; import com.linkedin.openhouse.tables.client.model.TimePartitionSpec; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -27,6 +33,7 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.FileContent; import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; @@ -39,6 +46,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions; +import scala.collection.JavaConverters; /** Utility class to collect stats for a given table. */ @Slf4j @@ -496,6 +504,272 @@ public static List populateCommitEventTable(Table table, Spark return commitEventTableList; } + /** + * Collect partition-level commit events for a table. + * + *

For each commit, identifies all affected partitions and creates one + * CommitEventTablePartitions record per (commit_id, partition) pair. + * + *

Uses Row API pattern: Query in Spark, collect to driver, transform in Java with full type + * safety. This matches the existing populateCommitEventTable() pattern. + * + *

Behavior: + * + *

    + *
  • Unpartitioned tables → Returns empty list + *
  • Queries all_entries metadata table for affected partitions + *
  • Joins with snapshots to get commit metadata + *
  • Transforms partition values to typed ColumnData objects + *
+ * + * @param table Iceberg table instance + * @param spark SparkSession + * @return List of CommitEventTablePartitions objects (one per commit-partition pair) + */ + public static List populateCommitEventTablePartitions( + Table table, SparkSession spark) { + + String fullTableName = table.name(); + log.info("Collecting partition-level commit events for table: {}", fullTableName); + + // Step 1: Check if table is partitioned + PartitionSpec spec = table.spec(); + if (spec.isUnpartitioned()) { + log.info("Table {} is unpartitioned, no partition events to collect", fullTableName); + return Collections.emptyList(); + } + + // Step 2: Parse table name components + String dbName = getDatabaseName(fullTableName); + if (dbName == null) { + return Collections.emptyList(); + } + + TableIdentifier identifier = TableIdentifier.parse(fullTableName); + String tableName = identifier.name(); + String clusterName = getClusterName(spark); + String tableMetadataLocation = table.location(); + String partitionSpecString = spec.toString(); + + // Extract partition column names from spec + List partitionColumnNames = + spec.fields().stream().map(f -> f.name()).collect(Collectors.toList()); + + // Step 3: Query all_entries metadata table for partitions per commit + // Use DISTINCT to deduplicate (snapshot_id, partition) pairs + // No status filter - captures all affected partitions (ADDED or DELETED files) + // Note: We query snapshots here even though populateCommitEventTable() also queries it. + // This is intentional to maintain parallel execution (both methods run simultaneously). + // Snapshots query is fast (~10-50ms, hits Iceberg metadata cache). + String allEntriesQuery = + String.format( + "SELECT DISTINCT snapshot_id, data_file.partition " + "FROM %s.all_entries", + table.name()); + + log.info("Executing all_entries query: {}", allEntriesQuery); + Dataset partitionsPerCommitDF = spark.sql(allEntriesQuery); + + // Cache for reuse + partitionsPerCommitDF.cache(); + long totalRecords = partitionsPerCommitDF.count(); + + if (totalRecords == 0) { + log.info("No partition-level commit events found for table: {}", fullTableName); + partitionsPerCommitDF.unpersist(); + return Collections.emptyList(); + } + + log.info( + "Found {} partition-level commit event records for table: {}", totalRecords, fullTableName); + + // Step 4: Join with snapshots to get commit metadata + String snapshotsQuery = + String.format( + "SELECT snapshot_id, committed_at, operation, summary " + "FROM %s.snapshots", + table.name()); + + Dataset snapshotsDF = spark.sql(snapshotsQuery); + + // Join partitions with commit metadata + Dataset enrichedDF = + partitionsPerCommitDF + .join(snapshotsDF, "snapshot_id") + .select( + functions.col("snapshot_id"), + functions.col("committed_at").cast("long"), // Cast timestamp to epoch seconds + functions.col("operation"), + functions.col("summary"), + functions.col("partition")); // Keep partition struct for Java transformation + + // Step 6: Collect to driver and transform in Java with type safety + // This matches populateCommitEventTable() pattern which also uses collectAsList() + // Size is manageable: typically 100K rows × 200 bytes = 20MB + log.info("Collecting {} rows to driver for transformation", totalRecords); + List rows = enrichedDF.collectAsList(); + + partitionsPerCommitDF.unpersist(); + + // Step 7: Delegate transformation to helper method + // Separated for testability and readability + List result = + transformRowsToPartitionEvents( + rows, + dbName, + tableName, + clusterName, + tableMetadataLocation, + partitionSpecString, + partitionColumnNames); + + log.info( + "Collected {} partition-level commit events for table: {}", result.size(), fullTableName); + + return result; + } + + /** + * Transform Spark rows to CommitEventTablePartitions objects. + * + *

This is a pure transformation method that converts raw Spark rows into domain objects. + * Separated from query logic for better testability and maintainability. + * + *

Visibility: Package-private for testing purposes. + * + * @param rows Spark rows containing commit and partition data + * @param dbName Database name + * @param tableName Table name + * @param clusterName Cluster name + * @param tableMetadataLocation Table metadata location + * @param partitionSpecString Partition spec as string + * @param partitionColumnNames List of partition column names (in spec order) + * @return List of CommitEventTablePartitions objects + */ + static List transformRowsToPartitionEvents( + List rows, + String dbName, + String tableName, + String clusterName, + String tableMetadataLocation, + String partitionSpecString, + List partitionColumnNames) { + + List result = new ArrayList<>(); + + for (Row row : rows) { + try { + // Extract commit metadata + long snapshotId = row.getAs("snapshot_id"); + long committedAtSeconds = row.getAs("committed_at"); + long committedAtMs = committedAtSeconds * 1000L; + String operation = row.getAs("operation"); + + // Convert operation string to CommitOperation enum + CommitOperation commitOperation = null; + if (operation != null) { + try { + commitOperation = CommitOperation.valueOf(operation.toUpperCase()); + } catch (IllegalArgumentException e) { + log.warn("Unknown commit operation: {}, setting to null", operation); + } + } + + // Extract summary map (convert from Scala to Java) + scala.collection.immutable.Map scalaMap = row.getAs("summary"); + Map summary = JavaConverters.mapAsJavaMap(scalaMap); + + // Extract partition struct and transform to ColumnData + Row partitionRow = row.getAs("partition"); + List partitionData = + transformPartitionRowToColumnData(partitionRow, partitionColumnNames); + + // Build CommitEventTablePartitions object using builder pattern + CommitEventTablePartitions event = + CommitEventTablePartitions.builder() + .dataset( + BaseEventModels.BaseTableIdentifier.builder() + .databaseName(dbName) + .tableName(tableName) + .clusterName(clusterName) + .tableMetadataLocation(tableMetadataLocation) + .partitionSpec(partitionSpecString) + .build()) + .commitMetadata( + CommitMetadata.builder() + .commitId(snapshotId) + .commitTimestampMs(committedAtMs) + .commitAppId(summary.get("spark.app.id")) + .commitAppName(summary.get("spark.app.name")) + .commitOperation(commitOperation) + .build()) + .partitionData(partitionData) + .eventTimestampMs(0L) // Will be set at publish time + .build(); + + result.add(event); + + } catch (Exception e) { + log.error("Failed to transform row to CommitEventTablePartitions: {}", row, e); + // Continue processing other rows (don't fail entire batch) + } + } + + return result; + } + + /** + * Transform Iceberg partition Row to List of ColumnData with full type safety. + * + *

This method handles different partition column types and creates the appropriate ColumnData + * subclass for each value. Uses instanceof checks for type safety. + * + *

Supported types: + * + *

    + *
  • Integer/Long → LongColumnData + *
  • Float/Double → DoubleColumnData + *
  • String/Date/Timestamp/Others → StringColumnData + *
+ * + *

Visibility: Package-private for testing purposes. + * + * @param partitionRow Spark Row containing partition column values + * @param columnNames List of partition column names (in spec order) + * @return List of ColumnData with typed values + */ + static List transformPartitionRowToColumnData( + Row partitionRow, List columnNames) { + + List result = new ArrayList<>(); + + for (int i = 0; i < columnNames.size(); i++) { + String colName = columnNames.get(i); + Object value = partitionRow.get(i); + + if (value == null) { + // Skip null partition values (shouldn't happen in valid Iceberg data) + log.warn("Null partition value for column: {}", colName); + continue; + } + + // Determine type and create appropriate ColumnData + // Order matters: check more specific types first + if (value instanceof Long) { + result.add(new ColumnData.LongColumnData(colName, (Long) value)); + } else if (value instanceof Integer) { + result.add(new ColumnData.LongColumnData(colName, ((Integer) value).longValue())); + } else if (value instanceof Double) { + result.add(new ColumnData.DoubleColumnData(colName, (Double) value)); + } else if (value instanceof Float) { + result.add(new ColumnData.DoubleColumnData(colName, ((Float) value).doubleValue())); + } else { + // Default: treat as string (handles String, Date, Timestamp, etc.) + result.add(new ColumnData.StringColumnData(colName, value.toString())); + } + } + + return result; + } + /** * Extract database name from fully-qualified table name. * diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/TableStatsCollectionSparkAppTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/TableStatsCollectionSparkAppTest.java index b0966fd12..025806871 100644 --- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/TableStatsCollectionSparkAppTest.java +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/TableStatsCollectionSparkAppTest.java @@ -3,7 +3,9 @@ import com.google.gson.Gson; import com.linkedin.openhouse.common.metrics.DefaultOtelConfig; import com.linkedin.openhouse.common.metrics.OtelEmitter; +import com.linkedin.openhouse.common.stats.model.ColumnData; import com.linkedin.openhouse.common.stats.model.CommitEventTable; +import com.linkedin.openhouse.common.stats.model.CommitEventTablePartitions; import com.linkedin.openhouse.common.stats.model.IcebergTableStats; import com.linkedin.openhouse.jobs.util.AppsOtelEmitter; import com.linkedin.openhouse.tablestest.OpenHouseSparkITest; @@ -434,6 +436,342 @@ public void testAppInstantiationDirectly() throws Exception { } } + // ==================== Partition Events Tests ==================== + + @Test + public void testPartitionEventsForPartitionedTable() throws Exception { + final String tableName = "db.test_partition_events"; + final int numInserts = 3; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // Setup: Create partitioned table with data in different partitions + prepareTable(ops, tableName, true); // partitioned by days(ts) + + // Insert data into 3 different partitions (3 commits, 3 partitions) + populateTable(ops, tableName, 1, 0); // Today + populateTable(ops, tableName, 1, 1); // Yesterday + populateTable(ops, tableName, 1, 2); // 2 days ago + + // Action: Collect partition events + List partitionEvents = + ops.collectCommitEventTablePartitions(tableName); + + // Verify: Should have 3 partition events (1 per commit-partition pair) + Assertions.assertFalse(partitionEvents.isEmpty()); + Assertions.assertEquals(numInserts, partitionEvents.size()); + + // Verify: All events have partition data + for (CommitEventTablePartitions event : partitionEvents) { + Assertions.assertNotNull(event.getPartitionData()); + Assertions.assertFalse(event.getPartitionData().isEmpty()); + + // Verify: Partition data has at least one column + Assertions.assertTrue(event.getPartitionData().size() > 0); + + // Verify: Partition values are typed ColumnData + ColumnData firstColumn = event.getPartitionData().get(0); + Assertions.assertNotNull(firstColumn); + Assertions.assertNotNull(firstColumn.getColumnName()); + } + + log.info("Partition events collection validated: {} events", partitionEvents.size()); + } + } + + @Test + public void testPartitionEventsForUnpartitionedTable() throws Exception { + final String tableName = "db.test_unpartitioned_partition_events"; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // Setup: Create unpartitioned table + prepareTable(ops, tableName, false); // NOT partitioned + populateTable(ops, tableName, 2); + + // Action: Collect partition events + List partitionEvents = + ops.collectCommitEventTablePartitions(tableName); + + // Verify: Empty list for unpartitioned tables + Assertions.assertTrue( + partitionEvents.isEmpty(), "Unpartitioned table should return empty partition events"); + + log.info("Unpartitioned table correctly returns empty partition events"); + } + } + + @Test + public void testPartitionEventsSchemaValidation() throws Exception { + final String tableName = "db.test_partition_schema"; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // Setup: Create partitioned table + prepareTable(ops, tableName, true); + populateTable(ops, tableName, 1); + + // Action: Collect partition events + List partitionEvents = + ops.collectCommitEventTablePartitions(tableName); + + Assertions.assertFalse(partitionEvents.isEmpty()); + CommitEventTablePartitions firstEvent = partitionEvents.get(0); + + // Verify: Dataset fields + Assertions.assertNotNull(firstEvent.getDataset()); + Assertions.assertEquals("db", firstEvent.getDataset().getDatabaseName()); + Assertions.assertEquals("test_partition_schema", firstEvent.getDataset().getTableName()); + Assertions.assertNotNull(firstEvent.getDataset().getClusterName()); + Assertions.assertNotNull(firstEvent.getDataset().getTableMetadataLocation()); + Assertions.assertNotNull(firstEvent.getDataset().getPartitionSpec()); + + // Verify: Commit metadata fields + Assertions.assertNotNull(firstEvent.getCommitMetadata()); + Assertions.assertNotNull(firstEvent.getCommitMetadata().getCommitId()); + Assertions.assertNotNull(firstEvent.getCommitMetadata().getCommitTimestampMs()); + Assertions.assertTrue(firstEvent.getCommitMetadata().getCommitTimestampMs() > 0); + + // Verify: Partition data (required for partitioned tables) + Assertions.assertNotNull(firstEvent.getPartitionData()); + Assertions.assertFalse(firstEvent.getPartitionData().isEmpty()); + + // Verify: Event timestamp is placeholder (set at publish time) + Assertions.assertEquals(0L, firstEvent.getEventTimestampMs()); + + log.info("Partition events schema validated successfully"); + } + } + + @Test + public void testPartitionEventsWithMultiplePartitions() throws Exception { + final String tableName = "db.test_multiple_partitions"; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // Setup: Create table and insert into multiple partitions + prepareTable(ops, tableName, true); + + // Insert 2 rows (creates 2 separate commits, same partition) + populateTable(ops, tableName, 2, 0); // 2 commits to partition day=0 + // Insert 2 rows (creates 2 separate commits, different partition) + populateTable(ops, tableName, 2, 1); // 2 commits to partition day=1 + // Insert 1 row (creates 1 commit, yet another partition) + populateTable(ops, tableName, 1, 2); // 1 commit to partition day=2 + + // Action: Collect partition events + List partitionEvents = + ops.collectCommitEventTablePartitions(tableName); + + // Verify: Should have 5 partition events (1 per commit-partition pair) + // Note: populateTable creates 1 commit per row, so 2+2+1 = 5 commits total + Assertions.assertEquals(5, partitionEvents.size()); + + // Verify: All commit IDs are present + List commitIds = + partitionEvents.stream() + .map(e -> e.getCommitMetadata().getCommitId()) + .collect(Collectors.toList()); + Assertions.assertEquals(5, commitIds.size()); + + // Verify: Commit IDs are unique (each INSERT creates a unique commit) + long uniqueCommitIds = commitIds.stream().distinct().count(); + Assertions.assertEquals(5, uniqueCommitIds); + + log.info( + "Multiple partitions handled correctly: {} partition events", partitionEvents.size()); + } + } + + @Test + public void testPartitionDataTypeHandling() throws Exception { + final String tableName = "db.test_partition_types"; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // Setup: Create partitioned table + prepareTable(ops, tableName, true); + populateTable(ops, tableName, 1); + + // Action: Collect partition events + List partitionEvents = + ops.collectCommitEventTablePartitions(tableName); + + Assertions.assertFalse(partitionEvents.isEmpty()); + CommitEventTablePartitions event = partitionEvents.get(0); + + // Verify: Partition data contains typed ColumnData objects + List partitionData = event.getPartitionData(); + Assertions.assertNotNull(partitionData); + Assertions.assertFalse(partitionData.isEmpty()); + + // Verify: Each column has name and value + for (ColumnData columnData : partitionData) { + Assertions.assertNotNull(columnData.getColumnName()); + + // Partition columns in our test are days(ts) which produces Integer/Long values + // Verify it's one of the supported types + boolean isValidType = + columnData instanceof ColumnData.LongColumnData + || columnData instanceof ColumnData.DoubleColumnData + || columnData instanceof ColumnData.StringColumnData; + + Assertions.assertTrue( + isValidType, "Partition column should be one of the supported ColumnData types"); + } + + log.info("Partition data types validated successfully"); + } + } + + @Test + public void testPublishPartitionEvents() throws Exception { + final String tableName = "db.test_publish_partition_events"; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // Setup: Create partitioned table + prepareTable(ops, tableName, true); + populateTable(ops, tableName, 2, 0); + populateTable(ops, tableName, 1, 1); + + // Action: Collect and publish partition events + List partitionEvents = + ops.collectCommitEventTablePartitions(tableName); + + TableStatsCollectionSparkApp app = + new TableStatsCollectionSparkApp("test-job", null, tableName, otelEmitter); + app.publishPartitionEvents(partitionEvents); + + // Verify: Event timestamps are set after publishing + long publishTime = System.currentTimeMillis(); + for (CommitEventTablePartitions event : partitionEvents) { + Assertions.assertTrue( + event.getEventTimestampMs() > 0, "Event timestamp should be set after publishing"); + Assertions.assertTrue( + event.getEventTimestampMs() <= publishTime, + "Event timestamp should not be in the future"); + } + + // Verify: JSON serialization works + String json = new Gson().toJson(partitionEvents); + Assertions.assertNotNull(json); + Assertions.assertTrue(json.contains("partitionData")); + Assertions.assertTrue(json.contains("commitId")); + + log.info("Partition events publishing validated with {} events", partitionEvents.size()); + } + } + + @Test + public void testPartitionEventsIntegrationWithFullApp() throws Exception { + final String tableName = "db.test_partition_integration"; + final int numCommits = 2; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // Setup: Create partitioned table with multiple commits + prepareTable(ops, tableName, true); + populateTable(ops, tableName, 1, 0); // Commit 1, partition 1 + populateTable(ops, tableName, 1, 1); // Commit 2, partition 2 + + // Action: Run full app (should collect stats, commit events, AND partition events) + TableStatsCollectionSparkApp app = + new TableStatsCollectionSparkApp("test-job", null, tableName, otelEmitter); + app.runInner(ops); + + // Verify: Stats collected + IcebergTableStats stats = ops.collectTableStats(tableName); + Assertions.assertNotNull(stats); + Assertions.assertEquals(numCommits, stats.getNumReferencedDataFiles()); + + // Verify: Commit events collected + List commitEvents = ops.collectCommitEventTable(tableName); + Assertions.assertEquals(numCommits, commitEvents.size()); + + // Verify: Partition events collected + List partitionEvents = + ops.collectCommitEventTablePartitions(tableName); + Assertions.assertEquals(numCommits, partitionEvents.size()); + + // Verify: Commit IDs match between commit events and partition events + List commitEventIds = + commitEvents.stream() + .map(e -> e.getCommitMetadata().getCommitId()) + .sorted() + .collect(Collectors.toList()); + + List partitionEventCommitIds = + partitionEvents.stream() + .map(e -> e.getCommitMetadata().getCommitId()) + .sorted() + .collect(Collectors.toList()); + + Assertions.assertEquals( + commitEventIds, + partitionEventCommitIds, + "Commit IDs should match between commit events and partition events"); + + log.info( + "Full app integration validated: {} commits, {} partition events", + commitEvents.size(), + partitionEvents.size()); + } + } + + @Test + public void testPartitionEventsWithNoData() throws Exception { + final String tableName = "db.test_partition_no_data"; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // Setup: Create partitioned table with no data + prepareTable(ops, tableName, true); + + // Action: Collect partition events + List partitionEvents = + ops.collectCommitEventTablePartitions(tableName); + + // Verify: Empty list (no commits = no partition events) + Assertions.assertTrue( + partitionEvents.isEmpty(), "Table with no data should return empty partition events"); + + log.info("Empty table handled correctly for partition events"); + } + } + + @Test + public void testPartitionEventsParallelExecution() throws Exception { + final String tableName = "db.test_parallel_execution"; + + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + // Setup: Create partitioned table + prepareTable(ops, tableName, true); + populateTable(ops, tableName, 2, 0); + + // Action: Run app which executes all three collections in parallel + long startTime = System.currentTimeMillis(); + + TableStatsCollectionSparkApp app = + new TableStatsCollectionSparkApp("test-job", null, tableName, otelEmitter); + app.runInner(ops); + + long endTime = System.currentTimeMillis(); + long parallelExecutionTime = endTime - startTime; + + // Verify: All three collections completed + IcebergTableStats stats = ops.collectTableStats(tableName); + List commitEvents = ops.collectCommitEventTable(tableName); + List partitionEvents = + ops.collectCommitEventTablePartitions(tableName); + + Assertions.assertNotNull(stats); + Assertions.assertFalse(commitEvents.isEmpty()); + Assertions.assertFalse(partitionEvents.isEmpty()); + + // Note: We can't easily verify parallel execution is faster without baseline, + // but we can verify all three completed successfully + log.info( + "Parallel execution completed in {} ms: stats={}, commits={}, partitions={}", + parallelExecutionTime, + stats != null, + commitEvents.size(), + partitionEvents.size()); + } + } + // ==================== Helper Methods ==================== private static void prepareTable(Operations ops, String tableName) { diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtilTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtilTest.java index 22ab11863..0f8445dbe 100644 --- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtilTest.java +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtilTest.java @@ -1,5 +1,12 @@ package com.linkedin.openhouse.jobs.util; +import com.linkedin.openhouse.common.stats.model.ColumnData; +import java.util.Arrays; +import java.util.List; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -54,4 +61,185 @@ public void testGetDatabaseName_withEmptyString() { String result = TableStatsCollectorUtil.getDatabaseName(""); Assertions.assertNull(result); } + + // ==================== Partition Data Transformation Tests ==================== + + @Test + public void testTransformPartitionRowToColumnData_withLongType() { + // Test: Partition column with Long value (e.g., year=2024) + StructType schema = + new StructType().add("year", DataTypes.LongType).add("month", DataTypes.LongType); + + Row partitionRow = new GenericRowWithSchema(new Object[] {2024L, 12L}, schema); + List columnNames = Arrays.asList("year", "month"); + + List result = + TableStatsCollectorUtil.transformPartitionRowToColumnData(partitionRow, columnNames); + + Assertions.assertEquals(2, result.size()); + Assertions.assertInstanceOf(ColumnData.LongColumnData.class, result.get(0)); + Assertions.assertEquals("year", result.get(0).getColumnName()); + Assertions.assertEquals(2024L, ((ColumnData.LongColumnData) result.get(0)).getValue()); + } + + @Test + public void testTransformPartitionRowToColumnData_withIntegerType() { + // Test: Partition column with Integer value (converted to Long) + StructType schema = new StructType().add("day", DataTypes.IntegerType); + + Row partitionRow = new GenericRowWithSchema(new Object[] {15}, schema); + List columnNames = Arrays.asList("day"); + + List result = + TableStatsCollectorUtil.transformPartitionRowToColumnData(partitionRow, columnNames); + + Assertions.assertEquals(1, result.size()); + Assertions.assertInstanceOf(ColumnData.LongColumnData.class, result.get(0)); + Assertions.assertEquals("day", result.get(0).getColumnName()); + Assertions.assertEquals(15L, ((ColumnData.LongColumnData) result.get(0)).getValue()); + } + + @Test + public void testTransformPartitionRowToColumnData_withDoubleType() { + // Test: Partition column with Double value + StructType schema = new StructType().add("score", DataTypes.DoubleType); + + Row partitionRow = new GenericRowWithSchema(new Object[] {99.5}, schema); + List columnNames = Arrays.asList("score"); + + List result = + TableStatsCollectorUtil.transformPartitionRowToColumnData(partitionRow, columnNames); + + Assertions.assertEquals(1, result.size()); + Assertions.assertInstanceOf(ColumnData.DoubleColumnData.class, result.get(0)); + Assertions.assertEquals("score", result.get(0).getColumnName()); + Assertions.assertEquals(99.5, ((ColumnData.DoubleColumnData) result.get(0)).getValue()); + } + + @Test + public void testTransformPartitionRowToColumnData_withFloatType() { + // Test: Partition column with Float value (converted to Double) + StructType schema = new StructType().add("rating", DataTypes.FloatType); + + Row partitionRow = new GenericRowWithSchema(new Object[] {4.5f}, schema); + List columnNames = Arrays.asList("rating"); + + List result = + TableStatsCollectorUtil.transformPartitionRowToColumnData(partitionRow, columnNames); + + Assertions.assertEquals(1, result.size()); + Assertions.assertInstanceOf(ColumnData.DoubleColumnData.class, result.get(0)); + Assertions.assertEquals("rating", result.get(0).getColumnName()); + Assertions.assertEquals(4.5, ((ColumnData.DoubleColumnData) result.get(0)).getValue(), 0.01); + } + + @Test + public void testTransformPartitionRowToColumnData_withStringType() { + // Test: Partition column with String value + StructType schema = new StructType().add("region", DataTypes.StringType); + + Row partitionRow = new GenericRowWithSchema(new Object[] {"US"}, schema); + List columnNames = Arrays.asList("region"); + + List result = + TableStatsCollectorUtil.transformPartitionRowToColumnData(partitionRow, columnNames); + + Assertions.assertEquals(1, result.size()); + Assertions.assertInstanceOf(ColumnData.StringColumnData.class, result.get(0)); + Assertions.assertEquals("region", result.get(0).getColumnName()); + Assertions.assertEquals("US", ((ColumnData.StringColumnData) result.get(0)).getValue()); + } + + @Test + public void testTransformPartitionRowToColumnData_withMultipleColumns() { + // Test: Multiple partition columns with different types + StructType schema = + new StructType() + .add("year", DataTypes.LongType) + .add("region", DataTypes.StringType) + .add("score", DataTypes.DoubleType); + + Row partitionRow = new GenericRowWithSchema(new Object[] {2024L, "EU", 95.5}, schema); + List columnNames = Arrays.asList("year", "region", "score"); + + List result = + TableStatsCollectorUtil.transformPartitionRowToColumnData(partitionRow, columnNames); + + Assertions.assertEquals(3, result.size()); + + // Verify year (Long) + Assertions.assertInstanceOf(ColumnData.LongColumnData.class, result.get(0)); + Assertions.assertEquals("year", result.get(0).getColumnName()); + Assertions.assertEquals(2024L, ((ColumnData.LongColumnData) result.get(0)).getValue()); + + // Verify region (String) + Assertions.assertInstanceOf(ColumnData.StringColumnData.class, result.get(1)); + Assertions.assertEquals("region", result.get(1).getColumnName()); + Assertions.assertEquals("EU", ((ColumnData.StringColumnData) result.get(1)).getValue()); + + // Verify score (Double) + Assertions.assertInstanceOf(ColumnData.DoubleColumnData.class, result.get(2)); + Assertions.assertEquals("score", result.get(2).getColumnName()); + Assertions.assertEquals(95.5, ((ColumnData.DoubleColumnData) result.get(2)).getValue()); + } + + @Test + public void testTransformPartitionRowToColumnData_withNullValue() { + // Test: Null partition value is skipped (logged but not added to result) + StructType schema = new StructType().add("region", DataTypes.StringType); + + Row partitionRow = new GenericRowWithSchema(new Object[] {null}, schema); + List columnNames = Arrays.asList("region"); + + List result = + TableStatsCollectorUtil.transformPartitionRowToColumnData(partitionRow, columnNames); + + // Null value should be skipped + Assertions.assertEquals(0, result.size()); + } + + @Test + public void testTransformPartitionRowToColumnData_withMixedNullAndValid() { + // Test: Mix of null and valid values - only valid values in result + StructType schema = + new StructType() + .add("year", DataTypes.LongType) + .add("region", DataTypes.StringType) + .add("month", DataTypes.IntegerType); + + Row partitionRow = new GenericRowWithSchema(new Object[] {2024L, null, 12}, schema); + List columnNames = Arrays.asList("year", "region", "month"); + + List result = + TableStatsCollectorUtil.transformPartitionRowToColumnData(partitionRow, columnNames); + + // Only 2 values (region is null and skipped) + Assertions.assertEquals(2, result.size()); + Assertions.assertEquals("year", result.get(0).getColumnName()); + Assertions.assertEquals("month", result.get(1).getColumnName()); + } + + @Test + public void testTransformPartitionRowToColumnData_withEmptyRow() { + // Test: Empty row (no columns) returns empty list + StructType schema = new StructType(); + Row partitionRow = new GenericRowWithSchema(new Object[] {}, schema); + List columnNames = Arrays.asList(); + + List result = + TableStatsCollectorUtil.transformPartitionRowToColumnData(partitionRow, columnNames); + + Assertions.assertEquals(0, result.size()); + } + + // Note: transformRowsToPartitionEvents() is tested via integration tests in + // TableStatsCollectionSparkAppTest because it requires: + // 1. Complex Row structure with nested fields (partition, summary map, etc.) + // 2. Scala collection conversion (summary map) + // 3. Realistic Iceberg metadata structures + // + // Integration tests provide better coverage: + // - testPartitionEventsForPartitionedTable() + // - testPartitionEventsSchemaValidation() + // - testPartitionEventsWithMultiplePartitions() }