Skip to content

Commit dd2d20e

Browse files
committed
Split vacuum work definition and execution
1 parent bddec49 commit dd2d20e

File tree

1 file changed

+14
-13
lines changed

1 file changed

+14
-13
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.trino.filesystem.Location;
2525
import io.trino.filesystem.TrinoFileSystem;
2626
import io.trino.filesystem.TrinoFileSystemFactory;
27+
import io.trino.filesystem.TrinoInputFile;
2728
import io.trino.plugin.base.util.UncheckedCloseable;
2829
import io.trino.plugin.deltalake.DeltaLakeConfig;
2930
import io.trino.plugin.deltalake.DeltaLakeMetadata;
@@ -257,9 +258,8 @@ private void doVacuum(
257258
long transactionLogFiles = 0;
258259
long retainedKnownFiles = 0;
259260
long retainedUnknownFiles = 0;
260-
long removedFiles = 0;
261+
List<TrinoInputFile> filesToDelete = new ArrayList<>();
261262

262-
List<Location> filesToDelete = new ArrayList<>();
263263
FileIterator listing = fileSystem.listFiles(Location.of(tableLocation));
264264
while (listing.hasNext()) {
265265
FileEntry entry = listing.next();
@@ -297,19 +297,20 @@ private void doVacuum(
297297
retainedUnknownFiles++;
298298
continue;
299299
}
300-
301300
log.debug("[%s] deleting file [%s] with modification time %s", queryId, location, modificationTime);
302-
filesToDelete.add(entry.location());
303-
if (filesToDelete.size() == DELETE_BATCH_SIZE) {
304-
fileSystem.deleteFiles(filesToDelete);
305-
removedFiles += filesToDelete.size();
306-
filesToDelete.clear();
307-
}
301+
302+
Location fileLocation = Location.of(location);
303+
TrinoInputFile inputFile = fileSystem.newInputFile(fileLocation);
304+
filesToDelete.add(inputFile);
308305
}
306+
int totalFilesToDelete = filesToDelete.size();
307+
int batchCount = (int) Math.ceil((double) totalFilesToDelete / DELETE_BATCH_SIZE);
308+
for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) {
309+
int start = batchNumber * DELETE_BATCH_SIZE;
310+
int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete);
309311

310-
if (!filesToDelete.isEmpty()) {
311-
fileSystem.deleteFiles(filesToDelete);
312-
removedFiles += filesToDelete.size();
312+
List<TrinoInputFile> batch = filesToDelete.subList(start, end);
313+
fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList()));
313314
}
314315

315316
log.info(
@@ -321,7 +322,7 @@ private void doVacuum(
321322
transactionLogFiles,
322323
retainedKnownFiles,
323324
retainedUnknownFiles,
324-
removedFiles);
325+
totalFilesToDelete);
325326
}
326327
}
327328
}

0 commit comments

Comments
 (0)