Skip to content

Commit 6267e48

Browse files
committed
Use SupportsPrefixOperations for Remove OrphanFile Procedure on Spark 3.5
1 parent 42d3885 commit 6267e48

File tree

2 files changed

+49
-31
lines changed

2 files changed

+49
-31
lines changed

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -293,28 +293,37 @@ private Dataset<FileURI> validFileIdentDS() {
293293

294294
private Dataset<FileURI> actualFileIdentDS() {
295295
StringToFileURI toFileURI = new StringToFileURI(equalSchemes, equalAuthorities);
296+
Dataset<String> dataList;
296297
if (compareToFileList == null) {
297-
return toFileURI.apply(listedFileDS());
298+
dataList =
299+
table.io() instanceof SupportsPrefixOperations ? listWithPrefix() : listWithoutPrefix();
298300
} else {
299-
return toFileURI.apply(filteredCompareToFileList());
301+
dataList = filteredCompareToFileList();
300302
}
303+
304+
return toFileURI.apply(dataList);
301305
}
302306

303-
private Dataset<String> listWithPrefix() {
307+
@VisibleForTesting
308+
Dataset<String> listWithPrefix() {
304309
List<String> matchingFiles = Lists.newArrayList();
310+
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());
311+
305312
Iterator<org.apache.iceberg.io.FileInfo> iterator =
306313
((SupportsPrefixOperations) table.io()).listPrefix(location).iterator();
307314
while (iterator.hasNext()) {
308315
org.apache.iceberg.io.FileInfo fileInfo = iterator.next();
309-
if (fileInfo.createdAtMillis() < olderThanTimestamp) {
316+
if (fileInfo.createdAtMillis() < olderThanTimestamp
317+
&& pathFilter.accept(new Path(fileInfo.location()))) {
310318
matchingFiles.add(fileInfo.location());
311319
}
312320
}
313321
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
314322
return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
315323
}
316324

317-
private Dataset<String> listWithoutPrefix() {
325+
@VisibleForTesting
326+
Dataset<String> listWithoutPrefix() {
318327
List<String> subDirs = Lists.newArrayList();
319328
List<String> matchingFiles = Lists.newArrayList();
320329

@@ -349,14 +358,6 @@ private Dataset<String> listWithoutPrefix() {
349358
return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
350359
}
351360

352-
private Dataset<String> listedFileDS() {
353-
if (table.io() instanceof SupportsPrefixOperations) {
354-
return listWithPrefix();
355-
} else {
356-
return listWithoutPrefix();
357-
}
358-
}
359-
360361
private static void listDirRecursively(
361362
String dir,
362363
Predicate<FileStatus> predicate,

spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -575,9 +575,12 @@ public void testHiddenPartitionPathsWithPartitionEvolution() {
575575
waitUntilAfter(System.currentTimeMillis());
576576

577577
SparkActions actions = SparkActions.get();
578+
DeleteOrphanFilesSparkAction action =
579+
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
580+
// test list methods by directly instantiating the action
581+
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());
578582

579-
DeleteOrphanFiles.Result result =
580-
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
583+
DeleteOrphanFiles.Result result = action.execute();
581584

582585
assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2);
583586
}
@@ -610,9 +613,12 @@ public void testHiddenPathsStartingWithPartitionNamesAreIgnored() throws IOExcep
610613
waitUntilAfter(System.currentTimeMillis());
611614

612615
SparkActions actions = SparkActions.get();
616+
DeleteOrphanFilesSparkAction action =
617+
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
618+
// test list methods by directly instantiating the action
619+
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());
613620

614-
DeleteOrphanFiles.Result result =
615-
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
621+
DeleteOrphanFiles.Result result = action.execute();
616622

617623
assertThat(result.orphanFileLocations()).as("Should delete 0 files").isEmpty();
618624
assertThat(fs.exists(pathToFileInHiddenFolder)).isTrue();
@@ -675,12 +681,10 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException {
675681
waitUntilAfter(System.currentTimeMillis());
676682

677683
SparkActions actions = SparkActions.get();
678-
DeleteOrphanFiles.Result result =
679-
actions
680-
.deleteOrphanFiles(table)
681-
.olderThan(System.currentTimeMillis())
682-
.deleteWith(s -> {})
683-
.execute();
684+
DeleteOrphanFilesSparkAction action =
685+
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).deleteWith(s -> {});
686+
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());
687+
DeleteOrphanFiles.Result result = action.execute();
684688
assertThat(result.orphanFileLocations())
685689
.as("Action should find 1 file")
686690
.isEqualTo(invalidFiles);
@@ -713,8 +717,11 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException
713717

714718
table.refresh();
715719

716-
DeleteOrphanFiles.Result result =
717-
SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
720+
DeleteOrphanFilesSparkAction action =
721+
SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
722+
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());
723+
724+
DeleteOrphanFiles.Result result = action.execute();
718725

719726
assertThat(result.orphanFileLocations()).as("Should delete only 1 file").hasSize(1);
720727

@@ -854,12 +861,14 @@ public void testCompareToFileList() throws IOException {
854861
.as("Invalid file should be present")
855862
.isTrue();
856863

857-
DeleteOrphanFiles.Result result3 =
864+
DeleteOrphanFilesSparkAction action3 =
858865
actions
859866
.deleteOrphanFiles(table)
860867
.compareToFileList(compareToFileList)
861-
.olderThan(System.currentTimeMillis())
862-
.execute();
868+
.olderThan(System.currentTimeMillis());
869+
assertThatDatasetsAreEqualIgnoringOrder(action3.listWithPrefix(), action3.listWithoutPrefix());
870+
871+
DeleteOrphanFiles.Result result3 = action3.execute();
863872
assertThat(result3.orphanFileLocations())
864873
.as("Action should delete 1 file")
865874
.isEqualTo(invalidFilePaths);
@@ -885,12 +894,14 @@ public void testCompareToFileList() throws IOException {
885894
.withColumnRenamed("filePath", "file_path")
886895
.withColumnRenamed("lastModified", "last_modified");
887896

888-
DeleteOrphanFiles.Result result4 =
897+
DeleteOrphanFilesSparkAction action4 =
889898
actions
890899
.deleteOrphanFiles(table)
891900
.compareToFileList(compareToFileListWithOutsideLocation)
892-
.deleteWith(s -> {})
893-
.execute();
901+
.deleteWith(s -> {});
902+
assertThatDatasetsAreEqualIgnoringOrder(action4.listWithPrefix(), action4.listWithoutPrefix());
903+
904+
DeleteOrphanFiles.Result result4 = action4.execute();
894905
assertThat(result4.orphanFileLocations()).as("Action should find nothing").isEmpty();
895906
}
896907

@@ -1100,4 +1111,10 @@ private void executeTest(
11001111
spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode);
11011112
assertThat(orphanFiles).isEqualTo(expectedOrphanFiles);
11021113
}
1114+
1115+
private void assertThatDatasetsAreEqualIgnoringOrder(Dataset<String> actual, Dataset<String> expected) {
1116+
assertThat(actual.collectAsList())
1117+
.as("same as")
1118+
.containsExactlyInAnyOrderElementsOf(expected.collectAsList());
1119+
}
11031120
}

0 commit comments

Comments
 (0)