Skip to content

Commit a191684

Browse files
committed
Use SupportsPrefixOperations for Remove OrphanFile Procedure on Spark 3.5
1 parent 42d3885 commit a191684

File tree

2 files changed

+59
-34
lines changed

2 files changed

+59
-34
lines changed

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -293,28 +293,37 @@ private Dataset<FileURI> validFileIdentDS() {
293293

294294
private Dataset<FileURI> actualFileIdentDS() {
295295
StringToFileURI toFileURI = new StringToFileURI(equalSchemes, equalAuthorities);
296+
Dataset<String> dataList;
296297
if (compareToFileList == null) {
297-
return toFileURI.apply(listedFileDS());
298+
dataList =
299+
table.io() instanceof SupportsPrefixOperations ? listWithPrefix() : listWithoutPrefix();
298300
} else {
299-
return toFileURI.apply(filteredCompareToFileList());
301+
dataList = filteredCompareToFileList();
300302
}
303+
304+
return toFileURI.apply(dataList);
301305
}
302306

303-
private Dataset<String> listWithPrefix() {
307+
@VisibleForTesting
308+
Dataset<String> listWithPrefix() {
304309
List<String> matchingFiles = Lists.newArrayList();
310+
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());
311+
305312
Iterator<org.apache.iceberg.io.FileInfo> iterator =
306313
((SupportsPrefixOperations) table.io()).listPrefix(location).iterator();
307314
while (iterator.hasNext()) {
308315
org.apache.iceberg.io.FileInfo fileInfo = iterator.next();
309-
if (fileInfo.createdAtMillis() < olderThanTimestamp) {
316+
if (fileInfo.createdAtMillis() < olderThanTimestamp
317+
&& pathFilter.accept(new Path(fileInfo.location()))) {
310318
matchingFiles.add(fileInfo.location());
311319
}
312320
}
313321
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
314322
return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
315323
}
316324

317-
private Dataset<String> listWithoutPrefix() {
325+
@VisibleForTesting
326+
Dataset<String> listWithoutPrefix() {
318327
List<String> subDirs = Lists.newArrayList();
319328
List<String> matchingFiles = Lists.newArrayList();
320329

@@ -349,14 +358,6 @@ private Dataset<String> listWithoutPrefix() {
349358
return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
350359
}
351360

352-
private Dataset<String> listedFileDS() {
353-
if (table.io() instanceof SupportsPrefixOperations) {
354-
return listWithPrefix();
355-
} else {
356-
return listWithoutPrefix();
357-
}
358-
}
359-
360361
private static void listDirRecursively(
361362
String dir,
362363
Predicate<FileStatus> predicate,

spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -539,9 +539,12 @@ public void testHiddenPartitionPaths() {
539539
waitUntilAfter(System.currentTimeMillis());
540540

541541
SparkActions actions = SparkActions.get();
542+
DeleteOrphanFilesSparkAction action =
543+
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
544+
// test list methods by directly instantiating the action
545+
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());
542546

543-
DeleteOrphanFiles.Result result =
544-
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
547+
DeleteOrphanFiles.Result result = action.execute();
545548

546549
assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2);
547550
}
@@ -575,9 +578,12 @@ public void testHiddenPartitionPathsWithPartitionEvolution() {
575578
waitUntilAfter(System.currentTimeMillis());
576579

577580
SparkActions actions = SparkActions.get();
581+
DeleteOrphanFilesSparkAction action =
582+
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
583+
// test list methods by directly instantiating the action
584+
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());
578585

579-
DeleteOrphanFiles.Result result =
580-
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
586+
DeleteOrphanFiles.Result result = action.execute();
581587

582588
assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2);
583589
}
@@ -605,17 +611,23 @@ public void testHiddenPathsStartingWithPartitionNamesAreIgnored() throws IOExcep
605611
Path dataPath = new Path(tableLocation + "/data");
606612
FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
607613
Path pathToFileInHiddenFolder = new Path(dataPath, "_c2_trunc/file.txt");
608-
fs.createNewFile(pathToFileInHiddenFolder);
614+
fs.createNewFile(new Path(dataPath, "_c2_trunc/file.txt"));
615+
Path pathToFileInHiddenFolder2 = new Path(dataPath, "_c2_trunc/subfolder/file.txt");
616+
fs.createNewFile(pathToFileInHiddenFolder2);
609617

610618
waitUntilAfter(System.currentTimeMillis());
611619

612620
SparkActions actions = SparkActions.get();
621+
DeleteOrphanFilesSparkAction action =
622+
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
623+
// test list methods by directly instantiating the action
624+
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());
613625

614-
DeleteOrphanFiles.Result result =
615-
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
626+
DeleteOrphanFiles.Result result = action.execute();
616627

617628
assertThat(result.orphanFileLocations()).as("Should delete 0 files").isEmpty();
618629
assertThat(fs.exists(pathToFileInHiddenFolder)).isTrue();
630+
assertThat(fs.exists(pathToFileInHiddenFolder2)).isTrue();
619631
}
620632

621633
private List<String> snapshotFiles(long snapshotId) {
@@ -675,12 +687,10 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException {
675687
waitUntilAfter(System.currentTimeMillis());
676688

677689
SparkActions actions = SparkActions.get();
678-
DeleteOrphanFiles.Result result =
679-
actions
680-
.deleteOrphanFiles(table)
681-
.olderThan(System.currentTimeMillis())
682-
.deleteWith(s -> {})
683-
.execute();
690+
DeleteOrphanFilesSparkAction action =
691+
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).deleteWith(s -> {});
692+
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());
693+
DeleteOrphanFiles.Result result = action.execute();
684694
assertThat(result.orphanFileLocations())
685695
.as("Action should find 1 file")
686696
.isEqualTo(invalidFiles);
@@ -713,8 +723,11 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException
713723

714724
table.refresh();
715725

716-
DeleteOrphanFiles.Result result =
717-
SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
726+
DeleteOrphanFilesSparkAction action =
727+
SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
728+
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());
729+
730+
DeleteOrphanFiles.Result result = action.execute();
718731

719732
assertThat(result.orphanFileLocations()).as("Should delete only 1 file").hasSize(1);
720733

@@ -854,12 +867,14 @@ public void testCompareToFileList() throws IOException {
854867
.as("Invalid file should be present")
855868
.isTrue();
856869

857-
DeleteOrphanFiles.Result result3 =
870+
DeleteOrphanFilesSparkAction action3 =
858871
actions
859872
.deleteOrphanFiles(table)
860873
.compareToFileList(compareToFileList)
861-
.olderThan(System.currentTimeMillis())
862-
.execute();
874+
.olderThan(System.currentTimeMillis());
875+
assertThatDatasetsAreEqualIgnoringOrder(action3.listWithPrefix(), action3.listWithoutPrefix());
876+
877+
DeleteOrphanFiles.Result result3 = action3.execute();
863878
assertThat(result3.orphanFileLocations())
864879
.as("Action should delete 1 file")
865880
.isEqualTo(invalidFilePaths);
@@ -885,12 +900,14 @@ public void testCompareToFileList() throws IOException {
885900
.withColumnRenamed("filePath", "file_path")
886901
.withColumnRenamed("lastModified", "last_modified");
887902

888-
DeleteOrphanFiles.Result result4 =
903+
DeleteOrphanFilesSparkAction action4 =
889904
actions
890905
.deleteOrphanFiles(table)
891906
.compareToFileList(compareToFileListWithOutsideLocation)
892-
.deleteWith(s -> {})
893-
.execute();
907+
.deleteWith(s -> {});
908+
assertThatDatasetsAreEqualIgnoringOrder(action4.listWithPrefix(), action4.listWithoutPrefix());
909+
910+
DeleteOrphanFiles.Result result4 = action4.execute();
894911
assertThat(result4.orphanFileLocations()).as("Action should find nothing").isEmpty();
895912
}
896913

@@ -1100,4 +1117,11 @@ private void executeTest(
11001117
spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode);
11011118
assertThat(orphanFiles).isEqualTo(expectedOrphanFiles);
11021119
}
1120+
1121+
private void assertThatDatasetsAreEqualIgnoringOrder(
1122+
Dataset<String> actual, Dataset<String> expected) {
1123+
assertThat(actual.collectAsList())
1124+
.as("same as")
1125+
.containsExactlyInAnyOrderElementsOf(expected.collectAsList());
1126+
}
11031127
}

0 commit comments

Comments
 (0)