Skip to content

Commit c3c15c5

Browse files
committed
Add day wise snapshots details
1 parent b65b434 commit c3c15c5

File tree

3 files changed

+49
-0
lines changed

3 files changed

+49
-0
lines changed

apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtil.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44

55
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
66
import java.io.IOException;
7+
import java.text.SimpleDateFormat;
8+
import java.util.Date;
9+
import java.util.LinkedHashMap;
10+
import java.util.Map;
711
import java.util.Optional;
12+
import java.util.stream.Collectors;
813
import java.util.stream.StreamSupport;
914
import lombok.extern.slf4j.Slf4j;
1015
import org.apache.hadoop.fs.FileSystem;
@@ -114,16 +119,42 @@ protected static IcebergTableStats populateStatsForSnapshots(
114119
.min(Long::compareTo)
115120
.orElse(null);
116121

122+
Map<String, Long> snapshotCountByDay =
123+
getSnapShotDistributionPerDay(table, spark, MetadataTableType.SNAPSHOTS);
124+
117125
return stats
118126
.toBuilder()
119127
.currentSnapshotId(currentSnapshotId)
120128
.currentSnapshotTimestamp(currentSnapshotTimestamp)
121129
.oldestSnapshotTimestamp(oldestSnapshotTimestamp)
122130
.numCurrentSnapshotReferencedDataFiles(countOfDataFiles)
123131
.totalCurrentSnapshotReferencedDataFilesSizeInBytes(sumOfFileSizeBytes)
132+
.snapshotCountByDay(snapshotCountByDay)
124133
.build();
125134
}
126135

136+
/** Get snapshot distribution for a given table by date. */
137+
private static Map<String, Long> getSnapShotDistributionPerDay(
138+
Table table, SparkSession spark, MetadataTableType metadataTableType) {
139+
Dataset<Row> snapShotDistribution =
140+
SparkTableUtil.loadMetadataTable(spark, table, metadataTableType)
141+
.selectExpr(new String[] {"snapshot_id", "committed_at"})
142+
.dropDuplicates("snapshot_id", "committed_at");
143+
144+
Map<String, Long> snapshotCountByDay =
145+
snapShotDistribution.collectAsList().stream()
146+
.collect(
147+
Collectors.toMap(
148+
row -> {
149+
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
150+
return formatter.format(new Date(row.getTimestamp(1).getTime()));
151+
},
152+
row -> 1L,
153+
Long::sum,
154+
LinkedHashMap::new));
155+
return snapshotCountByDay;
156+
}
157+
127158
/**
128159
* Collect storage stats for a given fully-qualified table name.
129160
*

apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@
99
import com.linkedin.openhouse.tables.client.model.Retention;
1010
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
1111
import io.opentelemetry.api.metrics.Meter;
12+
import java.text.SimpleDateFormat;
1213
import java.util.ArrayList;
14+
import java.util.Date;
1315
import java.util.List;
1416
import java.util.Map;
17+
import java.util.concurrent.atomic.AtomicLong;
1518
import java.util.function.BiFunction;
1619
import java.util.stream.Collectors;
1720
import lombok.extern.slf4j.Slf4j;
@@ -566,6 +569,18 @@ public void testCollectTableStats() throws Exception {
566569
+ stats.getNumExistingMetadataJsonFiles()
567570
+ stats.getNumReferencedManifestFiles()
568571
+ stats.getNumReferencedManifestLists());
572+
AtomicLong snapShotCount = new AtomicLong();
573+
574+
table
575+
.snapshots()
576+
.forEach(
577+
snapshot -> {
578+
snapShotCount.getAndIncrement();
579+
});
580+
581+
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
582+
Assertions.assertEquals(
583+
stats.getSnapshotCountByDay().get(formatter.format(new Date())), snapShotCount.get());
569584
}
570585
}
571586

services/common/src/main/java/com/linkedin/openhouse/common/stats/model/IcebergTableStats.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.openhouse.common.stats.model;
22

3+
import java.util.Map;
34
import lombok.AllArgsConstructor;
45
import lombok.Getter;
56
import lombok.NoArgsConstructor;
@@ -35,4 +36,6 @@ public class IcebergTableStats extends BaseTableMetadata {
3536
private Long numReferencedManifestFiles;
3637

3738
private Long numReferencedManifestLists;
39+
40+
private Map<String, Long> snapshotCountByDay;
3841
}

0 commit comments

Comments
 (0)