Skip to content

Commit 5c9b49c

Browse files
committed
Draft use multiple listing when its ObjectStoreLocationProvider
1 parent b2d4766 commit 5c9b49c

File tree

5 files changed

+82
-10
lines changed

5 files changed

+82
-10
lines changed

api/src/main/java/org/apache/iceberg/io/LocationProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,6 @@ public interface LocationProvider extends Serializable {
4545
* @return a fully-qualified location URI for a data file
4646
*/
4747
String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename);
48+
49+
String dataLocationRoot();
4850
}

core/src/main/java/org/apache/iceberg/LocationProviders.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,14 @@ public String newDataLocation(PartitionSpec spec, StructLike partitionData, Stri
102102
public String newDataLocation(String filename) {
103103
return String.format("%s/%s", dataLocation, filename);
104104
}
105+
106+
@Override
107+
public String dataLocationRoot() {
108+
return dataLocation;
109+
}
105110
}
106111

107-
static class ObjectStoreLocationProvider implements LocationProvider {
112+
public static class ObjectStoreLocationProvider implements LocationProvider {
108113

109114
private static final HashFunction HASH_FUNC = Hashing.murmur3_32_fixed();
110115
// Length of entropy generated in the file location
@@ -228,5 +233,10 @@ private String dirsFromHash(String hash) {
228233

229234
return hashWithDirs.toString();
230235
}
236+
237+
@Override
238+
public String dataLocationRoot() {
239+
return storageLocation;
240+
}
231241
}
232242
}

core/src/test/java/org/apache/iceberg/TestLocationProvider.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ public String newDataLocation(String filename) {
5757
public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) {
5858
throw new RuntimeException("Test custom provider does not expect any invocation");
5959
}
60+
61+
@Override
62+
public String dataLocationRoot() {
63+
return tableLocation;
64+
}
6065
}
6166

6267
// publicly visible for testing to be dynamically loaded
@@ -72,6 +77,11 @@ public String newDataLocation(String filename) {
7277
public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) {
7378
throw new RuntimeException("Test custom provider does not expect any invocation");
7479
}
80+
81+
@Override
82+
public String dataLocationRoot() {
83+
return "";
84+
}
7585
}
7686

7787
// publicly visible for testing to be dynamically loaded
@@ -88,6 +98,11 @@ public String newDataLocation(String filename) {
8898
public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) {
8999
throw new RuntimeException("Invalid provider should have not been instantiated!");
90100
}
101+
102+
@Override
103+
public String dataLocationRoot() {
104+
return "";
105+
}
91106
}
92107

93108
// publicly visible for testing to be dynamically loaded

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.hadoop.fs.FileSystem;
4444
import org.apache.hadoop.fs.Path;
4545
import org.apache.hadoop.fs.PathFilter;
46+
import org.apache.iceberg.LocationProviders;
4647
import org.apache.iceberg.PartitionSpec;
4748
import org.apache.iceberg.Table;
4849
import org.apache.iceberg.actions.DeleteOrphanFiles;
@@ -306,25 +307,46 @@ private Dataset<FileURI> actualFileIdentDS() {
306307
}
307308

308309
@VisibleForTesting
309-
Dataset<String> listWithPrefix() {
310+
List<String> listLocationWithPrefix(String location, PathFilter pathFilter) {
310311
List<String> matchingFiles = Lists.newArrayList();
311-
// listPrefix only returns files. so we additionally need to check parent folders for each file
312-
// in following example file itself is not filtered out,
313-
// but it should be excluded due to its parent folder: `_c2_trunc`
314-
// "/data/_c2_trunc/file.txt"
315-
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs(), true);
316-
317312
Iterator<org.apache.iceberg.io.FileInfo> iterator =
318313
((SupportsPrefixOperations) table.io()).listPrefix(location).iterator();
319314
while (iterator.hasNext()) {
320315
org.apache.iceberg.io.FileInfo fileInfo = iterator.next();
321-
// NOTE: check the path relative to table location. To avoid checking un necessary root
322-
// folders
316+
// NOTE: To avoid checking un necessary root folders, check the path relative to table
317+
// location.
323318
Path relativeFilePath = new Path(fileInfo.location().replace(location, ""));
324319
if (fileInfo.createdAtMillis() < olderThanTimestamp && pathFilter.accept(relativeFilePath)) {
325320
matchingFiles.add(fileInfo.location());
326321
}
327322
}
323+
return matchingFiles;
324+
}
325+
326+
@VisibleForTesting
327+
Dataset<String> listWithPrefix() {
328+
List<String> matchingFiles = Lists.newArrayList();
329+
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs(), true);
330+
331+
if (table.locationProvider() instanceof LocationProviders.ObjectStoreLocationProvider) {
332+
// ObjectStoreLocationProvider generates hierarchical prefixes in a binary fashion
333+
// (0000/, 0001/, 0010/, 0011/, ...).
334+
// This allows us to parallelize listing operations across these prefixes.
335+
List<String> prefixes =
336+
List.of(
337+
"/0000", "/0001", "/0010", "/0011", "/0100", "/0101", "/0110", "/0111", "/1000",
338+
"/1001", "/1010", "/1011", "/1100", "/1101", "/1110", "/1111");
339+
340+
String tableDataLocationRoot = table.locationProvider().dataLocationRoot();
341+
for (String prefix : prefixes) {
342+
List<String> result = listLocationWithPrefix(tableDataLocationRoot + prefix, pathFilter);
343+
matchingFiles.addAll(result);
344+
}
345+
346+
} else {
347+
matchingFiles.addAll(listLocationWithPrefix(location, pathFilter));
348+
}
349+
328350
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
329351
return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
330352
}

spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.iceberg.actions.DeleteOrphanFiles;
6161
import org.apache.iceberg.catalog.Namespace;
6262
import org.apache.iceberg.catalog.TableIdentifier;
63+
import org.apache.iceberg.data.GenericRecord;
6364
import org.apache.iceberg.exceptions.ValidationException;
6465
import org.apache.iceberg.hadoop.HadoopCatalog;
6566
import org.apache.iceberg.hadoop.HadoopTables;
@@ -515,6 +516,7 @@ public void testManyLeafPartitions() {
515516

516517
@TestTemplate
517518
public void testHiddenPartitionPaths() {
519+
//
518520
Schema schema =
519521
new Schema(
520522
optional(1, "c1", Types.IntegerType.get()),
@@ -1090,6 +1092,27 @@ public void testRemoveOrphanFileActionWithDeleteMode() {
10901092
DeleteOrphanFiles.PrefixMismatchMode.DELETE);
10911093
}
10921094

1095+
@TestTemplate
1096+
public void testLocationProvider() {
1097+
ImmutableMap<String, String> props =
1098+
ImmutableMap.of(
1099+
TableProperties.FORMAT_VERSION,
1100+
String.valueOf(formatVersion),
1101+
TableProperties.OBJECT_STORE_ENABLED,
1102+
"true");
1103+
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
1104+
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).truncate("c2", 2).identity("c3").build();
1105+
Table table = TABLES.create(SCHEMA, spec, props, tableLocation);
1106+
System.out.println(">: " + table.location());
1107+
System.out.println(">: " + table.locationProvider().newDataLocation("test"));
1108+
GenericRecord record =
1109+
GenericRecord.create(SCHEMA).copy(Map.of("c1", "54321", "c2", "bbbb", "c3", "cccc"));
1110+
System.out.println(">: " + table.locationProvider().newDataLocation(spec, record, "test"));
1111+
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
1112+
assertThat(table.locationProvider().newDataLocation("test")).as("equals").isEqualTo("xxx");
1113+
assertThat(table.locationProvider().getClass().toString()).as("equals").isEqualTo("xxx");
1114+
}
1115+
10931116
protected String randomName(String prefix) {
10941117
return prefix + UUID.randomUUID().toString().replace("-", "");
10951118
}

0 commit comments

Comments
 (0)