diff --git a/polaris-service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java b/polaris-service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java index e17e24fd7..f906daba3 100644 --- a/polaris-service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java +++ b/polaris-service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java @@ -42,9 +42,11 @@ import org.slf4j.LoggerFactory; /** - * {@link TaskHandler} responsible for deleting all of the files in a manifest and the manifest - * itself. Since data files may be present in multiple manifests across different snapshots, we - * assume a data file that doesn't exist is missing because it was already deleted by another task. + * {@link TaskHandler} responsible for deleting table files: 1. Manifest files: It contains all the + * files in a manifest and the manifest itself. Since data files may be present in multiple + * manifests across different snapshots, we assume a data file that doesn't exist is missing because + * it was already deleted by another task. 2. Table content files: It contains previous metadata and + * statistics files, which are grouped and deleted in batch */ public class ManifestFileCleanupTaskHandler implements TaskHandler { public static final int MAX_ATTEMPTS = 3; @@ -68,58 +70,107 @@ public boolean canHandleTask(TaskEntity task) { @Override public boolean handleTask(TaskEntity task) { ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class); - ManifestFile manifestFile = decodeManifestData(cleanupTask.getManifestFileData()); TableIdentifier tableId = cleanupTask.getTableId(); try (FileIO authorizedFileIO = fileIOSupplier.apply(task)) { - - // if the file doesn't exist, we assume that another task execution was successful, but failed - // to drop the task entity. Log a warning and return success - if (!TaskUtils.exists(manifestFile.path(), authorizedFileIO)) { + if (cleanupTask.getManifestFileData() != null) { + ManifestFile manifestFile = decodeManifestData(cleanupTask.getManifestFileData()); + return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId); + } else if (cleanupTask.getContentFileBatch() != null) { + return cleanUpContentFiles(cleanupTask.getContentFileBatch(), authorizedFileIO, tableId); + } else { LOGGER .atWarn() - .addKeyValue("manifestFile", manifestFile.path()) .addKeyValue("tableId", tableId) - .log("Manifest cleanup task scheduled, but manifest file doesn't exist"); + .log("Cleanup task scheduled, but input file doesn't exist"); return true; } + } + } - ManifestReader dataFiles = ManifestFiles.read(manifestFile, authorizedFileIO); - List> dataFileDeletes = - StreamSupport.stream( - Spliterators.spliteratorUnknownSize(dataFiles.iterator(), Spliterator.IMMUTABLE), - false) - .map( - file -> - tryDelete( - tableId, authorizedFileIO, manifestFile, file.path().toString(), null, 1)) - .toList(); - LOGGER.debug( - "Scheduled {} data files to be deleted from manifest {}", - dataFileDeletes.size(), - manifestFile.path()); - try { - // wait for all data files to be deleted, then wait for the manifest itself to be deleted - CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new)) - .thenCompose( - (v) -> { - LOGGER - .atInfo() - .addKeyValue("manifestFile", manifestFile.path()) - .log("All data files in manifest deleted - deleting manifest"); - return tryDelete( - tableId, authorizedFileIO, manifestFile, manifestFile.path(), null, 1); - }) - .get(); - return true; - } catch (InterruptedException e) { - LOGGER.error( - "Interrupted exception deleting data files from manifest {}", manifestFile.path(), e); - throw new RuntimeException(e); - } catch (ExecutionException e) { - LOGGER.error("Unable to delete data files from manifest {}", manifestFile.path(), e); - return false; - } + private boolean cleanUpManifestFile( + ManifestFile manifestFile, FileIO fileIO, TableIdentifier tableId) { + // if the file doesn't exist, we assume that another task execution was successful, but + // failed to drop the task entity. Log a warning and return success + if (!TaskUtils.exists(manifestFile.path(), fileIO)) { + LOGGER + .atWarn() + .addKeyValue("manifestFile", manifestFile.path()) + .addKeyValue("tableId", tableId) + .log("Manifest cleanup task scheduled, but manifest file doesn't exist"); + return true; + } + + ManifestReader dataFiles = ManifestFiles.read(manifestFile, fileIO); + List> dataFileDeletes = + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(dataFiles.iterator(), Spliterator.IMMUTABLE), + false) + .map(file -> tryDelete(tableId, fileIO, manifestFile, file.path().toString(), null, 1)) + .toList(); + LOGGER.debug( + "Scheduled {} data files to be deleted from manifest {}", + dataFileDeletes.size(), + manifestFile.path()); + try { + // wait for all data files to be deleted, then wait for the manifest itself to be deleted + CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new)) + .thenCompose( + (v) -> { + LOGGER + .atInfo() + .addKeyValue("manifestFile", manifestFile.path()) + .log("All data files in manifest deleted - deleting manifest"); + return tryDelete(tableId, fileIO, manifestFile, manifestFile.path(), null, 1); + }) + .get(); + return true; + } catch (InterruptedException e) { + LOGGER.error( + "Interrupted exception deleting data files from manifest {}", manifestFile.path(), e); + throw new RuntimeException(e); + } catch (ExecutionException e) { + LOGGER.error("Unable to delete data files from manifest {}", manifestFile.path(), e); + return false; + } + } + + private boolean cleanUpContentFiles( + List contentFileBatch, FileIO fileIO, TableIdentifier tableId) { + List validFiles = + contentFileBatch.stream().filter(file -> TaskUtils.exists(file, fileIO)).toList(); + if (validFiles.isEmpty()) { + LOGGER + .atWarn() + .addKeyValue("contentFileBatch", contentFileBatch.toString()) + .addKeyValue("tableId", tableId) + .log("Table content cleanup task scheduled, but the none of the file in batch exists"); + return true; } + + // Schedule the deletion for each file asynchronously + List> deleteFutures = + validFiles.stream().map(file -> tryDelete(tableId, fileIO, null, file, null, 1)).toList(); + + // Wait for all delete operations to finish + try { + CompletableFuture allDeletes = + CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])); + allDeletes.join(); + } catch (Exception e) { + LOGGER + .atWarn() + .addKeyValue("contentFileBatch", contentFileBatch.toString()) + .addKeyValue("tableId", tableId) + .log("Exception detected during content file batch deletion", e); + } + + LOGGER + .atInfo() + .addKeyValue("contentFileBatch", contentFileBatch.toString()) + .addKeyValue("tableId", tableId) + .log("Content file batch deletion has completed"); + + return true; } private static ManifestFile decodeManifestData(String manifestFileData) { @@ -134,16 +185,16 @@ private CompletableFuture tryDelete( TableIdentifier tableId, FileIO fileIO, ManifestFile manifestFile, - String dataFile, + String file, Throwable e, int attempt) { if (e != null && attempt <= MAX_ATTEMPTS) { LOGGER .atWarn() - .addKeyValue("dataFile", dataFile) + .addKeyValue("file", file) .addKeyValue("attempt", attempt) .addKeyValue("error", e.getMessage()) - .log("Error encountered attempting to delete data file"); + .log("Error encountered attempting to delete file"); } if (attempt > MAX_ATTEMPTS && e != null) { return CompletableFuture.failedFuture(e); @@ -155,15 +206,15 @@ private CompletableFuture tryDelete( // file's existence, but then it is deleted before we have a chance to // send the delete request. In such a case, we should retry // and find - if (TaskUtils.exists(dataFile, fileIO)) { - fileIO.deleteFile(dataFile); + if (TaskUtils.exists(file, fileIO)) { + fileIO.deleteFile(file); } else { LOGGER .atInfo() - .addKeyValue("dataFile", dataFile) - .addKeyValue("manifestFile", manifestFile.path()) + .addKeyValue("file", file) + .addKeyValue("manifestFile", manifestFile != null ? manifestFile.path() : "") .addKeyValue("tableId", tableId) - .log("Manifest cleanup task scheduled, but data file doesn't exist"); + .log("table file cleanup task scheduled, but data file doesn't exist"); } }, executorService) @@ -171,11 +222,11 @@ private CompletableFuture tryDelete( newEx -> { LOGGER .atWarn() - .addKeyValue("dataFile", dataFile) - .addKeyValue("tableIdentifer", tableId) - .addKeyValue("manifestFile", manifestFile.path()) + .addKeyValue("dataFile", file) + .addKeyValue("tableIdentifier", tableId) + .addKeyValue("manifestFile", manifestFile != null ? manifestFile.path() : "") .log("Exception caught deleting data file from manifest", newEx); - return tryDelete(tableId, fileIO, manifestFile, dataFile, newEx, attempt + 1); + return tryDelete(tableId, fileIO, manifestFile, file, newEx, attempt + 1); }, CompletableFuture.delayedExecutor( FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService)); @@ -185,12 +236,18 @@ private CompletableFuture tryDelete( public static final class ManifestCleanupTask { private TableIdentifier tableId; private String manifestFileData; + private List contentFileBatch; public ManifestCleanupTask(TableIdentifier tableId, String manifestFileData) { this.tableId = tableId; this.manifestFileData = manifestFileData; } + public ManifestCleanupTask(TableIdentifier tableId, List contentFileBatch) { + this.tableId = tableId; + this.contentFileBatch = contentFileBatch; + } + public ManifestCleanupTask() {} public TableIdentifier getTableId() { @@ -209,17 +266,26 @@ public void setManifestFileData(String manifestFileData) { this.manifestFileData = manifestFileData; } + public List getContentFileBatch() { + return contentFileBatch; + } + + public void setContentFileBatch(List contentFileBatch) { + this.contentFileBatch = contentFileBatch; + } + @Override public boolean equals(Object object) { if (this == object) return true; if (!(object instanceof ManifestCleanupTask that)) return false; return Objects.equals(tableId, that.tableId) - && Objects.equals(manifestFileData, that.manifestFileData); + && Objects.equals(manifestFileData, that.manifestFileData) + && Objects.equals(contentFileBatch, that.contentFileBatch); } @Override public int hashCode() { - return Objects.hash(tableId, manifestFileData); + return Objects.hash(tableId, manifestFileData, contentFileBatch); } } } diff --git a/polaris-service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java b/polaris-service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java index 7f323174b..3f5486a1d 100644 --- a/polaris-service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java +++ b/polaris-service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java @@ -18,11 +18,14 @@ */ package org.apache.polaris.service.task; +import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.io.FileIO; @@ -49,6 +52,7 @@ public class TableCleanupTaskHandler implements TaskHandler { private final TaskExecutor taskExecutor; private final MetaStoreManagerFactory metaStoreManagerFactory; private final Function fileIOSupplier; + private static final int BATCH_SIZE = 10; public TableCleanupTaskHandler( TaskExecutor taskExecutor, @@ -102,7 +106,7 @@ public boolean handleTask(TaskEntity cleanupTask) { // read the manifest list for each snapshot. dedupe the manifest files and schedule a // cleanupTask // for each manifest file and its data files to be deleted - List taskEntities = + Stream manifestCleanupTasks = tableMetadata.snapshots().stream() .flatMap(sn -> sn.allManifests(fileIO).stream()) // distinct by manifest path, since multiple snapshots will contain the same @@ -142,8 +146,40 @@ public boolean handleTask(TaskEntity cleanupTask) { // copy the internal properties, which will have storage info .setInternalProperties(cleanupTask.getInternalPropertiesAsMap()) .build(); - }) - .toList(); + }); + + Stream contentFileCleanupTasks = + getContentFileBatch(tableMetadata, fileIO).stream() + .map( + fileBatch -> { + String taskName = + String.join( + "_", + cleanupTask.getName(), + fileBatch.toString(), + UUID.randomUUID().toString()); + LOGGER + .atDebug() + .addKeyValue("taskName", taskName) + .addKeyValue("tableIdentifier", tableEntity.getTableIdentifier()) + .addKeyValue("fileBatch", fileBatch) + .log( + "Queueing task to delete content file (prev metadata and statistics files)"); + return new TaskEntity.Builder() + .setName(taskName) + .setId(metaStoreManager.generateNewEntityId(polarisCallContext).getId()) + .setCreateTimestamp(polarisCallContext.getClock().millis()) + .withTaskType(AsyncTaskType.FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableEntity.getTableIdentifier(), fileBatch)) + .setInternalProperties(cleanupTask.getInternalPropertiesAsMap()) + .build(); + }); + + List taskEntities = + Stream.concat(manifestCleanupTasks, contentFileCleanupTasks).toList(); + List createdTasks = metaStoreManager .createEntitiesIfNotExist(polarisCallContext, null, taskEntities) @@ -154,10 +190,12 @@ public boolean handleTask(TaskEntity cleanupTask) { .addKeyValue("tableIdentifier", tableEntity.getTableIdentifier()) .addKeyValue("metadataLocation", tableEntity.getMetadataLocation()) .addKeyValue("taskCount", taskEntities.size()) - .log("Successfully queued tasks to delete manifests - deleting table metadata file"); + .log( + "Successfully queued tasks to delete manifests, previous metadata, and statistics files - deleting table metadata file"); for (PolarisBaseEntity createdTask : createdTasks) { taskExecutor.addTaskHandlerContext(createdTask.getId(), CallContext.getCurrentContext()); } + fileIO.deleteFile(tableEntity.getMetadataLocation()); return true; @@ -165,4 +203,19 @@ public boolean handleTask(TaskEntity cleanupTask) { } return false; } + + private List> getContentFileBatch(TableMetadata tableMetadata, FileIO fileIO) { + List> result = new ArrayList<>(); + List contentFiles = + Stream.concat( + tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file), + tableMetadata.statisticsFiles().stream().map(StatisticsFile::path)) + .filter(file -> TaskUtils.exists(file, fileIO)) + .toList(); + + for (int i = 0; i < contentFiles.size(); i += BATCH_SIZE) { + result.add(contentFiles.subList(i, Math.min(i + BATCH_SIZE, contentFiles.size()))); + } + return result; + } } diff --git a/polaris-service/src/test/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandlerTest.java b/polaris-service/src/test/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandlerTest.java index 711e661f2..862e90237 100644 --- a/polaris-service/src/test/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandlerTest.java +++ b/polaris-service/src/test/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandlerTest.java @@ -19,17 +19,24 @@ package org.apache.polaris.service.task; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatPredicate; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import org.apache.commons.codec.binary.Base64; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.inmemory.InMemoryFileIO; @@ -226,4 +233,228 @@ public void deleteFile(String location) { assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); } } + + @Test + public void testContentFileCleanup() throws IOException { + PolarisCallContext polarisCallContext = + new PolarisCallContext( + metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), + new PolarisDefaultDiagServiceImpl()); + try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { + CallContext.setCurrentContext(callCtx); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } + }; + TableIdentifier tableIdentifier = + TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler((task) -> fileIO, Executors.newSingleThreadExecutor()); + + long snapshotId1 = 100L; + ManifestFile manifestFile1 = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId1, "dataFile1.parquet", "dataFile2.parquet"); + ManifestFile manifestFile2 = + TaskTestUtils.manifestFile( + fileIO, "manifest2.avro", snapshotId1, "dataFile3.parquet", "dataFile4.parquet"); + Snapshot snapshot = + TaskTestUtils.newSnapshot( + fileIO, "manifestList.avro", 1, snapshotId1, 99L, manifestFile1, manifestFile2); + StatisticsFile statisticsFile1 = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + String firstMetadataFile = "v1-295495059.metadata.json"; + TableMetadata firstMetadata = + TaskTestUtils.writeTableMetadata( + fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot); + assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); + + ManifestFile manifestFile3 = + TaskTestUtils.manifestFile( + fileIO, "manifest3.avro", snapshot.snapshotId() + 1, "dataFile5.parquet"); + Snapshot snapshot2 = + TaskTestUtils.newSnapshot( + fileIO, + "manifestList2.avro", + snapshot.sequenceNumber() + 1, + snapshot.snapshotId() + 1, + snapshot.snapshotId(), + manifestFile1, + manifestFile3); // exclude manifest2 from the new snapshot + StatisticsFile statisticsFile2 = + TaskTestUtils.writeStatsFile( + snapshot2.snapshotId(), + snapshot2.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + String secondMetadataFile = "v1-295495060.metadata.json"; + TableMetadata secondMetadata = + TaskTestUtils.writeTableMetadata( + fileIO, + secondMetadataFile, + firstMetadata, + firstMetadataFile, + List.of(statisticsFile2), + snapshot2); + assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); + assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue(); + + List cleanupFiles = + Stream.concat( + secondMetadata.previousFiles().stream() + .map(TableMetadata.MetadataLogEntry::file) + .filter(file -> TaskUtils.exists(file, fileIO)), + secondMetadata.statisticsFiles().stream() + .map(StatisticsFile::path) + .filter(file -> TaskUtils.exists(file, fileIO))) + .toList(); + + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, cleanupFiles)) + .setName(UUID.randomUUID().toString()) + .build(); + + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThatPredicate(handler::handleTask).accepts(task); + + assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) + .rejects(firstMetadataFile); + assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) + .rejects(statisticsFile1.path()); + assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) + .rejects(statisticsFile2.path()); + } + } + + @Test + public void testContentFileCleanupIfFileNotExist() throws IOException { + PolarisCallContext polarisCallContext = + new PolarisCallContext( + metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), + new PolarisDefaultDiagServiceImpl()); + try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { + CallContext.setCurrentContext(callCtx); + FileIO fileIO = new InMemoryFileIO(); + TableIdentifier tableIdentifier = + TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler((task) -> fileIO, Executors.newSingleThreadExecutor()); + long snapshotId = 100L; + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); + TestSnapshot snapshot = + TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); + String metadataFile = "v1-49494949.metadata.json"; + StatisticsFile statisticsFile = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); + + fileIO.deleteFile(statisticsFile.path()); + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); + + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, List.of(statisticsFile.path()))) + .setName(UUID.randomUUID().toString()) + .build(); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThatPredicate(handler::handleTask).accepts(task); + } + } + + @Test + public void testCleanupWithRetries() throws IOException { + PolarisCallContext polarisCallContext = + new PolarisCallContext( + metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), + new PolarisDefaultDiagServiceImpl()); + try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { + CallContext.setCurrentContext(callCtx); + Map retryCounter = new HashMap<>(); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } + + @Override + public void deleteFile(String location) { + int attempts = + retryCounter + .computeIfAbsent(location, k -> new AtomicInteger(0)) + .incrementAndGet(); + if (attempts < 3) { + throw new RuntimeException("Simulating failure to test retries"); + } else { + super.deleteFile(location); + } + } + }; + TableIdentifier tableIdentifier = + TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler((task) -> fileIO, Executors.newSingleThreadExecutor()); + long snapshotId = 100L; + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); + TestSnapshot snapshot = + TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); + String metadataFile = "v1-49494949.metadata.json"; + StatisticsFile statisticsFile = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isTrue(); + + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, List.of(statisticsFile.path()))) + .setName(UUID.randomUUID().toString()) + .build(); + + CompletableFuture future = + CompletableFuture.runAsync( + () -> { + assertThatPredicate(handler::canHandleTask).accepts(task); + handler.handleTask(task); // this will schedule the batch deletion + }); + + // Wait for all async tasks to finish + future.join(); + + // Check if the file was successfully deleted after retries + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); + + // Ensure that retries happened as expected + assertThat(retryCounter.containsKey(statisticsFile.path())).isTrue(); + assertThat(retryCounter.get(statisticsFile.path()).get()).isEqualTo(3); + } + } } diff --git a/polaris-service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java b/polaris-service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java index ab9f9324c..17ae80618 100644 --- a/polaris-service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java +++ b/polaris-service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java @@ -22,10 +22,13 @@ import java.io.IOException; import java.util.List; +import java.util.UUID; import org.apache.commons.codec.binary.Base64; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.inmemory.InMemoryFileIO; @@ -76,7 +79,13 @@ public void testTableCleanup() throws IOException { TestSnapshot snapshot = TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); String metadataFile = "v1-49494949.metadata.json"; - TaskTestUtils.writeTableMetadata(fileIO, metadataFile, snapshot); + StatisticsFile statisticsFile = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); TaskEntity task = new TaskEntity.Builder() @@ -97,19 +106,30 @@ public void testTableCleanup() throws IOException { assertThat( metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) - .loadTasks(polarisCallContext, "test", 1) + .loadTasks(polarisCallContext, "test", 2) .getEntities()) - .hasSize(1) - .satisfiesExactly( + .hasSize(2) + .satisfiesExactlyInAnyOrder( taskEntity -> assertThat(taskEntity) .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) - .extracting(entity -> TaskEntity.of(entity)) + .extracting(TaskEntity::of) .returns(AsyncTaskType.FILE_CLEANUP, TaskEntity::getTaskType) .returns( new ManifestFileCleanupTaskHandler.ManifestCleanupTask( tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile))), + entity -> + entity.readData( + ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)), + taskEntity -> + assertThat(taskEntity) + .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) + .extracting(TaskEntity::of) + .returns(AsyncTaskType.FILE_CLEANUP, TaskEntity::getTaskType) + .returns( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, List.of(statisticsFile.path())), entity -> entity.readData( ManifestFileCleanupTaskHandler.ManifestCleanupTask.class))); @@ -243,7 +263,7 @@ public void close() { taskEntity -> assertThat(taskEntity) .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) - .extracting(entity -> TaskEntity.of(entity)) + .extracting(TaskEntity::of) .returns(AsyncTaskType.FILE_CLEANUP, TaskEntity::getTaskType) .returns( new ManifestFileCleanupTaskHandler.ManifestCleanupTask( @@ -255,7 +275,7 @@ public void close() { taskEntity -> assertThat(taskEntity) .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) - .extracting(entity -> TaskEntity.of(entity)) + .extracting(TaskEntity::of) .returns(AsyncTaskType.FILE_CLEANUP, TaskEntity::getTaskType) .returns( new ManifestFileCleanupTaskHandler.ManifestCleanupTask( @@ -303,7 +323,20 @@ public void testTableCleanupMultipleSnapshots() throws IOException { manifestFile1, manifestFile3); // exclude manifest2 from the new snapshot String metadataFile = "v1-295495059.metadata.json"; - TaskTestUtils.writeTableMetadata(fileIO, metadataFile, snapshot, snapshot2); + StatisticsFile statisticsFile1 = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + StatisticsFile statisticsFile2 = + TaskTestUtils.writeStatsFile( + snapshot2.snapshotId(), + snapshot2.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + TaskTestUtils.writeTableMetadata( + fileIO, metadataFile, List.of(statisticsFile1, statisticsFile2), snapshot, snapshot2); TaskEntity task = new TaskEntity.Builder() @@ -318,21 +351,181 @@ public void testTableCleanupMultipleSnapshots() throws IOException { Assertions.assertThatPredicate(handler::canHandleTask).accepts(task); CallContext.setCurrentContext(CallContext.of(realmContext, polarisCallContext)); + handler.handleTask(task); - assertThat( - metaStoreManagerFactory - .getOrCreateMetaStoreManager(realmContext) - .loadTasks(polarisCallContext, "test", 5) - .getEntities()) + List entities = + metaStoreManagerFactory + .getOrCreateMetaStoreManager(realmContext) + .loadTasks(polarisCallContext, "test", 5) + .getEntities(); + + List manifestCleanupTasks = + entities.stream() + .filter( + entity -> { + AsyncTaskType taskType = TaskEntity.of(entity).getTaskType(); + return taskType == AsyncTaskType.FILE_CLEANUP; + }) + .toList(); + + assertThat(manifestCleanupTasks) + // all three manifests should be present, even though one is excluded from the latest + // snapshot + .hasSize(4) + .satisfiesExactlyInAnyOrder( + taskEntity -> + assertThat(taskEntity) + .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) + .extracting(TaskEntity::of) + .returns( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, + Base64.encodeBase64String(ManifestFiles.encode(manifestFile1))), + entity -> + entity.readData( + ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)), + taskEntity -> + assertThat(taskEntity) + .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) + .extracting(TaskEntity::of) + .returns( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, + Base64.encodeBase64String(ManifestFiles.encode(manifestFile2))), + entity -> + entity.readData( + ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)), + taskEntity -> + assertThat(taskEntity) + .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) + .extracting(TaskEntity::of) + .returns( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, + Base64.encodeBase64String(ManifestFiles.encode(manifestFile3))), + entity -> + entity.readData( + ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)), + taskEntity -> + assertThat(taskEntity) + .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) + .extracting(TaskEntity::of) + .returns( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, + List.of(statisticsFile1.path(), statisticsFile2.path())), + entity -> + entity.readData( + ManifestFileCleanupTaskHandler.ManifestCleanupTask.class))); + } + } + + @Test + public void testTableCleanupMultipleMetadata() throws IOException { + PolarisCallContext polarisCallContext = + new PolarisCallContext( + metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), + new PolarisDefaultDiagServiceImpl()); + try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { + CallContext.setCurrentContext(callCtx); + FileIO fileIO = new InMemoryFileIO(); + TableIdentifier tableIdentifier = + TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + TableCleanupTaskHandler handler = + new TableCleanupTaskHandler(Mockito.mock(), metaStoreManagerFactory, (task) -> fileIO); + long snapshotId1 = 100L; + ManifestFile manifestFile1 = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId1, "dataFile1.parquet", "dataFile2.parquet"); + ManifestFile manifestFile2 = + TaskTestUtils.manifestFile( + fileIO, "manifest2.avro", snapshotId1, "dataFile3.parquet", "dataFile4.parquet"); + Snapshot snapshot = + TaskTestUtils.newSnapshot( + fileIO, "manifestList.avro", 1, snapshotId1, 99L, manifestFile1, manifestFile2); + StatisticsFile statisticsFile1 = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + String firstMetadataFile = "v1-295495059.metadata.json"; + TableMetadata firstMetadata = + TaskTestUtils.writeTableMetadata( + fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot); + assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); + + ManifestFile manifestFile3 = + TaskTestUtils.manifestFile( + fileIO, "manifest3.avro", snapshot.snapshotId() + 1, "dataFile5.parquet"); + Snapshot snapshot2 = + TaskTestUtils.newSnapshot( + fileIO, + "manifestList2.avro", + snapshot.sequenceNumber() + 1, + snapshot.snapshotId() + 1, + snapshot.snapshotId(), + manifestFile1, + manifestFile3); // exclude manifest2 from the new snapshot + StatisticsFile statisticsFile2 = + TaskTestUtils.writeStatsFile( + snapshot2.snapshotId(), + snapshot2.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + String secondMetadataFile = "v1-295495060.metadata.json"; + TaskTestUtils.writeTableMetadata( + fileIO, + secondMetadataFile, + firstMetadata, + firstMetadataFile, + List.of(statisticsFile2), + snapshot2); + assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); + assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue(); + + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.ENTITY_CLEANUP_SCHEDULER) + .withData( + new TableLikeEntity.Builder(tableIdentifier, secondMetadataFile) + .setName("table1") + .setCatalogId(1) + .setCreateTimestamp(100) + .build()) + .build(); + + Assertions.assertThatPredicate(handler::canHandleTask).accepts(task); + + CallContext.setCurrentContext(CallContext.of(realmContext, polarisCallContext)); + + handler.handleTask(task); + + List entities = + metaStoreManagerFactory + .getOrCreateMetaStoreManager(realmContext) + .loadTasks(polarisCallContext, "test", 6) + .getEntities(); + + List manifestCleanupTasks = + entities.stream() + .filter( + entity -> { + AsyncTaskType taskType = TaskEntity.of(entity).getTaskType(); + return taskType == AsyncTaskType.FILE_CLEANUP; + }) + .toList(); + + assertThat(manifestCleanupTasks) // all three manifests should be present, even though one is excluded from the latest // snapshot - .hasSize(3) + .hasSize(4) .satisfiesExactlyInAnyOrder( taskEntity -> assertThat(taskEntity) .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) - .extracting(entity -> TaskEntity.of(entity)) + .extracting(TaskEntity::of) .returns( new ManifestFileCleanupTaskHandler.ManifestCleanupTask( tableIdentifier, @@ -343,7 +536,7 @@ public void testTableCleanupMultipleSnapshots() throws IOException { taskEntity -> assertThat(taskEntity) .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) - .extracting(entity -> TaskEntity.of(entity)) + .extracting(TaskEntity::of) .returns( new ManifestFileCleanupTaskHandler.ManifestCleanupTask( tableIdentifier, @@ -354,11 +547,25 @@ public void testTableCleanupMultipleSnapshots() throws IOException { taskEntity -> assertThat(taskEntity) .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) - .extracting(entity -> TaskEntity.of(entity)) + .extracting(TaskEntity::of) .returns( new ManifestFileCleanupTaskHandler.ManifestCleanupTask( tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile3))), + entity -> + entity.readData( + ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)), + taskEntity -> + assertThat(taskEntity) + .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) + .extracting(TaskEntity::of) + .returns( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, + List.of( + firstMetadataFile, + statisticsFile1.path(), + statisticsFile2.path())), entity -> entity.readData( ManifestFileCleanupTaskHandler.ManifestCleanupTask.class))); diff --git a/polaris-service/src/test/java/org/apache/polaris/service/task/TaskTestUtils.java b/polaris-service/src/test/java/org/apache/polaris/service/task/TaskTestUtils.java index 1e5612e25..5864ff160 100644 --- a/polaris-service/src/test/java/org/apache/polaris/service/task/TaskTestUtils.java +++ b/polaris-service/src/test/java/org/apache/polaris/service/task/TaskTestUtils.java @@ -19,6 +19,7 @@ package org.apache.polaris.service.task; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; @@ -26,6 +27,8 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; @@ -33,12 +36,16 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; import org.apache.iceberg.types.Types; import org.jetbrains.annotations.NotNull; @@ -62,25 +69,55 @@ static ManifestFile manifestFile( return writer.toManifestFile(); } - static void writeTableMetadata(FileIO fileIO, String metadataFile, Snapshot... snapshots) + static TableMetadata writeTableMetadata(FileIO fileIO, String metadataFile, Snapshot... snapshots) throws IOException { - TableMetadata.Builder tmBuidler = - TableMetadata.buildFromEmpty() - .setLocation("path/to/table") - .addSchema( - new Schema( - List.of(Types.NestedField.of(1, false, "field1", Types.StringType.get()))), - 1) - .addSortOrder(SortOrder.unsorted()) - .assignUUID(UUID.randomUUID().toString()) - .addPartitionSpec(PartitionSpec.unpartitioned()); + return writeTableMetadata(fileIO, metadataFile, null, null, null, snapshots); + } + + static TableMetadata writeTableMetadata( + FileIO fileIO, + String metadataFile, + List statisticsFiles, + Snapshot... snapshots) + throws IOException { + return writeTableMetadata(fileIO, metadataFile, null, null, statisticsFiles, snapshots); + } + + static TableMetadata writeTableMetadata( + FileIO fileIO, + String metadataFile, + TableMetadata prevMetadata, + String prevMetadataFile, + List statisticsFiles, + Snapshot... snapshots) + throws IOException { + TableMetadata.Builder tmBuilder; + if (prevMetadata == null) { + tmBuilder = TableMetadata.buildFromEmpty(); + } else { + tmBuilder = TableMetadata.buildFrom(prevMetadata).setPreviousFileLocation(prevMetadataFile); + } + tmBuilder + .setLocation("path/to/table") + .addSchema( + new Schema(List.of(Types.NestedField.of(1, false, "field1", Types.StringType.get()))), + 1) + .addSortOrder(SortOrder.unsorted()) + .assignUUID(UUID.randomUUID().toString()) + .addPartitionSpec(PartitionSpec.unpartitioned()); + + int statisticsFileIndex = 0; for (Snapshot snapshot : snapshots) { - tmBuidler.addSnapshot(snapshot); + tmBuilder.addSnapshot(snapshot); + if (statisticsFiles != null) { + tmBuilder.setStatistics(snapshot.snapshotId(), statisticsFiles.get(statisticsFileIndex++)); + } } - TableMetadata tableMetadata = tmBuidler.build(); + TableMetadata tableMetadata = tmBuilder.build(); PositionOutputStream out = fileIO.newOutputFile(metadataFile).createOrOverwrite(); out.write(TableMetadataParser.toJson(tableMetadata).getBytes(StandardCharsets.UTF_8)); out.close(); + return tableMetadata; } static @NotNull TestSnapshot newSnapshot( @@ -103,4 +140,26 @@ static void writeTableMetadata(FileIO fileIO, String metadataFile, Snapshot... s new TestSnapshot(sequenceNumber, snapshotId, parentSnapshot, 1L, manifestListLocation); return snapshot; } + + public static StatisticsFile writeStatsFile( + long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO) + throws IOException { + try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) { + puffinWriter.add( + new Blob( + "some-blob-type", + List.of(1), + snapshotId, + snapshotSequenceNumber, + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + puffinWriter.finish(); + + return new GenericStatisticsFile( + snapshotId, + statsLocation, + puffinWriter.fileSize(), + puffinWriter.footerSize(), + puffinWriter.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).toList()); + } + } }