Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -124,10 +125,20 @@ public void close() {
snapshot.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".stats",
fileIO);
PartitionStatisticsFile partitionStatisticsFile1 =
TaskTestUtils.writePartitionStatsFile(
snapshot.snapshotId(),
snapshot.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".partition_stats",
fileIO);
String firstMetadataFile = "v1-295495059.metadata.json";
TableMetadata firstMetadata =
TaskTestUtils.writeTableMetadata(
fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot);
fileIO,
firstMetadataFile,
List.of(statisticsFile1),
List.of(partitionStatisticsFile1),
snapshot);
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();

ManifestFile manifestFile3 =
Expand All @@ -148,6 +159,12 @@ public void close() {
snapshot2.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".stats",
fileIO);
PartitionStatisticsFile partitionStatisticsFile2 =
TaskTestUtils.writePartitionStatsFile(
snapshot2.snapshotId(),
snapshot2.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".partition_stats",
fileIO);
String secondMetadataFile = "v1-295495060.metadata.json";
TableMetadata secondMetadata =
TaskTestUtils.writeTableMetadata(
Expand All @@ -156,18 +173,21 @@ public void close() {
firstMetadata,
firstMetadataFile,
List.of(statisticsFile2),
List.of(partitionStatisticsFile2),
snapshot2);
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue();

List<String> cleanupFiles =
Stream.concat(
secondMetadata.previousFiles().stream()
.map(TableMetadata.MetadataLogEntry::file)
.filter(file -> TaskUtils.exists(file, fileIO)),
secondMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.filter(file -> TaskUtils.exists(file, fileIO)))
Stream.of(
secondMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file),
secondMetadata.statisticsFiles().stream().map(StatisticsFile::path),
firstMetadata.partitionStatisticsFiles().stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the secondMetadata. partitionStatisticsFiles() is enough here as it contains the entries for all the snapshots?

similar to statisticsFiles() that exists already.

.map(PartitionStatisticsFile::path),
secondMetadata.partitionStatisticsFiles().stream()
.map(PartitionStatisticsFile::path))
.flatMap(s -> s)
.filter(file -> TaskUtils.exists(file, fileIO))
.toList();

TaskEntity task =
Expand All @@ -183,12 +203,9 @@ public void close() {
assertThatPredicate(handler::canHandleTask).accepts(task);
assertThat(handler.handleTask(task, callCtx)).isTrue();

assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
.rejects(firstMetadataFile);
assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
.rejects(statisticsFile1.path());
assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
.rejects(statisticsFile2.path());
for (String cleanupFile : cleanupFiles) {
assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)).rejects(cleanupFile);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.commons.codec.binary.Base64;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -506,10 +507,20 @@ public void testTableCleanupMultipleMetadata() throws IOException {
snapshot.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".stats",
fileIO);
PartitionStatisticsFile partitionStatisticsFile1 =
TaskTestUtils.writePartitionStatsFile(
snapshot.snapshotId(),
snapshot.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".partition_stats",
fileIO);
String firstMetadataFile = "v1-295495059.metadata.json";
TableMetadata firstMetadata =
TaskTestUtils.writeTableMetadata(
fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot);
fileIO,
firstMetadataFile,
List.of(statisticsFile1),
List.of(partitionStatisticsFile1),
snapshot);
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();

ManifestFile manifestFile3 =
Expand All @@ -530,13 +541,20 @@ public void testTableCleanupMultipleMetadata() throws IOException {
snapshot2.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".stats",
fileIO);
PartitionStatisticsFile partitionStatisticsFile2 =
TaskTestUtils.writePartitionStatsFile(
snapshot2.snapshotId(),
snapshot2.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".partition_stats",
fileIO);
String secondMetadataFile = "v1-295495060.metadata.json";
TaskTestUtils.writeTableMetadata(
fileIO,
secondMetadataFile,
firstMetadata,
firstMetadataFile,
List.of(statisticsFile2),
List.of(partitionStatisticsFile2),
snapshot2);
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue();
Expand Down Expand Up @@ -596,7 +614,9 @@ public void testTableCleanupMultipleMetadata() throws IOException {
snapshot.manifestListLocation(),
snapshot2.manifestListLocation(),
statisticsFile1.path(),
statisticsFile2.path())),
statisticsFile2.path(),
partitionStatisticsFile1.path(),
partitionStatisticsFile2.path())),
entity ->
entity.readData(
BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
Expand Down Expand Up @@ -71,7 +73,7 @@ static ManifestFile manifestFile(

static TableMetadata writeTableMetadata(FileIO fileIO, String metadataFile, Snapshot... snapshots)
throws IOException {
return writeTableMetadata(fileIO, metadataFile, null, null, null, snapshots);
return writeTableMetadata(fileIO, metadataFile, null, null, null, null, snapshots);
}

static TableMetadata writeTableMetadata(
Expand All @@ -80,7 +82,18 @@ static TableMetadata writeTableMetadata(
List<StatisticsFile> statisticsFiles,
Snapshot... snapshots)
throws IOException {
return writeTableMetadata(fileIO, metadataFile, null, null, statisticsFiles, snapshots);
return writeTableMetadata(fileIO, metadataFile, null, null, statisticsFiles, null, snapshots);
}

static TableMetadata writeTableMetadata(
FileIO fileIO,
String metadataFile,
List<StatisticsFile> statisticsFiles,
List<PartitionStatisticsFile> partitionStatsFiles,
Snapshot... snapshots)
throws IOException {
return writeTableMetadata(
fileIO, metadataFile, null, null, statisticsFiles, partitionStatsFiles, snapshots);
}

static TableMetadata writeTableMetadata(
Expand All @@ -89,6 +102,7 @@ static TableMetadata writeTableMetadata(
TableMetadata prevMetadata,
String prevMetadataFile,
List<StatisticsFile> statisticsFiles,
List<PartitionStatisticsFile> partitionStatsFiles,
Snapshot... snapshots)
throws IOException {
TableMetadata.Builder tmBuilder;
Expand All @@ -106,11 +120,15 @@ static TableMetadata writeTableMetadata(
.addPartitionSpec(PartitionSpec.unpartitioned());

int statisticsFileIndex = 0;
int partitionStatsFileIndex = 0;
for (Snapshot snapshot : snapshots) {
tmBuilder.addSnapshot(snapshot);
if (statisticsFiles != null) {
tmBuilder.setStatistics(statisticsFiles.get(statisticsFileIndex++));
}
if (partitionStatsFiles != null) {
tmBuilder.setPartitionStatistics(partitionStatsFiles.get(partitionStatsFileIndex++));
}
}
TableMetadata tableMetadata = tmBuilder.build();
PositionOutputStream out = fileIO.newOutputFile(metadataFile).createOrOverwrite();
Expand Down Expand Up @@ -161,4 +179,26 @@ public static StatisticsFile writeStatsFile(
puffinWriter.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).toList());
}
}

public static PartitionStatisticsFile writePartitionStatsFile(
long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO)
throws IOException {

try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition stats is not exactly a puffin file.

It can be a file in table default format. Like parquet, avro or ORC.
Maybe we can write a dummy avro or parquet file.

more context from Iceberg repo: https://github.com/apache/iceberg/blob/696a72c0f88c3af1096e716b196f1609da34e50d/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java#L500

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From reading the test, it looks like anything can work so long as it's picked up by TableMetadata::partitionStatisticsFiles, is that right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, anything can work. Just doesn't want to leave an impression that it is a puffin file. We can create a dummy file as .parquet extension as I shared above. It is very minimal code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, anything can work. Just doesn't want to leave an impression that it is a puffin file. We can create a dummy file as .parquet extension as I shared above. It is very minimal code.

Thanks for catching it, I have adjust the writing impl with parquet as the file format for partition stat, follow the impl in https://github.com/apache/iceberg/blob/main/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java#L33

puffinWriter.add(
new Blob(
"some-blob-type",
List.of(1),
snapshotId,
snapshotSequenceNumber,
ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8))));
puffinWriter.finish();

return ImmutableGenericPartitionStatisticsFile.builder()
.snapshotId(snapshotId)
.path(statsLocation)
.fileSizeInBytes(puffinWriter.fileSize())
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -112,7 +113,6 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
metaStoreManager,
polarisCallContext);

// TODO: handle partition statistics files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like these weren't left out due to an oversight but rather they were intentionally excluded. I'm curious if there is any background on why that is -- is there some specific pitfall related to cleaning up the partition stats?

Copy link
Contributor Author

@danielhumanmod danielhumanmod May 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like these weren't left out due to an oversight but rather they were intentionally excluded. I'm curious if there is any background on why that is -- is there some specific pitfall related to cleaning up the partition stats?

Good catch — to the best of my knowledge, Polaris drop table prune currently has a gap compared to Iceberg's implementation due to some reasons (which I don't know either, curious too)

Iceberg will delete all file types under the metadata/ directory, including manifests, manifest lists, metadata files, previous metadata, and .stats files (both table and partition-level). Iceberg code pointer: CatalogUtil.java#L124 for reference.

This gap also discussed earlier in this issue comment.

Happy to learn more if there’s additional context I missed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, interesting. Thanks for sharing that link and that context

Stream<TaskEntity> metadataFileCleanupTasks =
getMetadataTaskStream(
cleanupTask,
Expand Down Expand Up @@ -243,12 +243,13 @@ private Stream<TaskEntity> getMetadataTaskStream(
private List<List<String>> getMetadataFileBatches(TableMetadata tableMetadata, int batchSize) {
List<List<String>> result = new ArrayList<>();
List<String> metadataFiles =
Copy link
Contributor

@eric-maynard eric-maynard May 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really accurate to call puffin files metadata files, and is it necessarily correct to group all of these together? I guess the intent here is to collect all of the not-data files?

Copy link
Contributor Author

@danielhumanmod danielhumanmod May 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really accurate to call puffin files metadata files, and is it necessarily correct to group all of these together? I guess the intent here is to collect all of the not-data files?

Good point — the original intention was to group files under the metadata/ directory to reduce the overhead of scheduling tasks. As more file types like stats and partition stats were added later, nonDataFiles (or something similar) might now better reflect what’s being collected.

Curious to hear your thoughts — would it be clearer to separate them, or is the performance benefit of grouping still preferred?

Copy link
Contributor

@eric-maynard eric-maynard May 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see... continuing to separate out the data files makes sense to me.

I think this came up on one of the previous PRs, but the real solution here needs to eventually involve moving this purge work out of the catalog server and into the maintenance service where we handle compaction etc. That's the only way to really achieve scalability.

Stream.concat(
Stream.concat(
tableMetadata.previousFiles().stream()
.map(TableMetadata.MetadataLogEntry::file),
tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation)),
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path))
Stream.of(
tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file),
tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation),
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path),
tableMetadata.partitionStatisticsFiles().stream()
.map(PartitionStatisticsFile::path))
.flatMap(s -> s)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this line achieve? Does tableMetadata.partitionStatisticsFiles() return back a Stream of Streams?

Copy link
Contributor Author

@danielhumanmod danielhumanmod May 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question.

  1. what tableMetadata.partitionStatisticsFiles() return is List<String>
  2. The reason of is line is: Each call to .stream().map(...) is producing a Stream<String>, so by doing Stream.of(...), we end up with a Stream<Stream<String>>. The .flatMap(s -> s) is there to flatten it all into one Stream before filtering.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh that makes sense - I missed the change to Stream.of().

.toList();

for (int i = 0; i < metadataFiles.size(); i += batchSize) {
Expand Down