- 
                Notifications
    You must be signed in to change notification settings 
- Fork 320
Async Deletion of Previous Metadata and Statistics Files #312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 23 commits
d0cd456
              26e03ac
              0f8e8f4
              1b525de
              e8b26d2
              2ee6dee
              806f46d
              4f1d3c9
              0a77bfa
              47dc60a
              9d835b3
              40c6147
              f354d1c
              88c6651
              af3efab
              278ab7e
              ed30fb0
              05c3dd9
              49dbe68
              8eea50d
              d9804e6
              54511de
              56ba4f2
              e92852e
              27ea1b3
              4d1b68b
              eb533d7
              47f760f
              988e530
              4965d5c
              5f81483
              097189c
              651ece0
              16bb5fe
              d276ae6
              187b47e
              ba5c47c
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -109,6 +109,7 @@ | |
| import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; | ||
| import org.apache.polaris.service.task.ManifestFileCleanupTaskHandler; | ||
| import org.apache.polaris.service.task.TableCleanupTaskHandler; | ||
| import org.apache.polaris.service.task.TableContentCleanupTaskHandler; | ||
| import org.apache.polaris.service.task.TaskExecutorImpl; | ||
| import org.apache.polaris.service.task.TaskFileIOSupplier; | ||
| import org.apache.polaris.service.throttling.StreamReadConstraintsExceptionMapper; | ||
|  | @@ -217,6 +218,9 @@ public void run(PolarisApplicationConfig configuration, Environment environment) | |
| taskExecutor.addTaskHandler( | ||
| new ManifestFileCleanupTaskHandler( | ||
| fileIOSupplier, Executors.newVirtualThreadPerTaskExecutor())); | ||
| taskExecutor.addTaskHandler( | ||
| new TableContentCleanupTaskHandler( | ||
| fileIOSupplier, Executors.newVirtualThreadPerTaskExecutor())); | ||
|          | ||
|  | ||
| LOGGER.info( | ||
| "Initializing PolarisCallContextCatalogFactory for metaStoreManagerType {}", | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -22,7 +22,9 @@ | |
| import java.util.UUID; | ||
| import java.util.function.Function; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
| import org.apache.iceberg.ManifestFile; | ||
| import org.apache.iceberg.StatisticsFile; | ||
| import org.apache.iceberg.TableMetadata; | ||
| import org.apache.iceberg.TableMetadataParser; | ||
| import org.apache.iceberg.io.FileIO; | ||
|  | @@ -49,6 +51,7 @@ public class TableCleanupTaskHandler implements TaskHandler { | |
| private final TaskExecutor taskExecutor; | ||
| private final MetaStoreManagerFactory metaStoreManagerFactory; | ||
| private final Function<TaskEntity, FileIO> fileIOSupplier; | ||
| private static final int BATCH_SIZE = 10; | ||
|          | ||
|  | ||
| public TableCleanupTaskHandler( | ||
| TaskExecutor taskExecutor, | ||
|  | @@ -158,11 +161,98 @@ public boolean handleTask(TaskEntity cleanupTask) { | |
| for (PolarisBaseEntity createdTask : createdTasks) { | ||
| taskExecutor.addTaskHandlerContext(createdTask.getId(), CallContext.getCurrentContext()); | ||
| } | ||
|  | ||
| // Schedule and dispatch prev metadata and stat files in seperated tasks | ||
| scheduleTableContentCleanupTask( | ||
| tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file), | ||
| CleanupTableContentFileType.PREV_METADATA, | ||
| fileIO, | ||
| cleanupTask, | ||
| metaStoreManager, | ||
| polarisCallContext); | ||
| scheduleTableContentCleanupTask( | ||
| tableMetadata.statisticsFiles().stream().map(StatisticsFile::path), | ||
| CleanupTableContentFileType.STATISTICS, | ||
| fileIO, | ||
| cleanupTask, | ||
| metaStoreManager, | ||
| polarisCallContext); | ||
|          | ||
|  | ||
| fileIO.deleteFile(tableEntity.getMetadataLocation()); | ||
|  | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|  | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extract previous manifest task creation into a new method, no new change added for line 152 - 201 | ||
| private void scheduleTableContentCleanupTask( | ||
| Stream<String> fileStream, | ||
| CleanupTableContentFileType fileType, | ||
| FileIO fileIO, | ||
| TaskEntity cleanupTask, | ||
| PolarisMetaStoreManager metaStoreManager, | ||
| PolarisCallContext polarisCallContext) { | ||
| PolarisBaseEntity entity = cleanupTask.readData(PolarisBaseEntity.class); | ||
| TableLikeEntity tableEntity = TableLikeEntity.of(entity); | ||
|  | ||
| List<String> validFiles = fileStream.filter(file -> TaskUtils.exists(file, fileIO)).toList(); | ||
|  | ||
| for (int i = 0; i < validFiles.size(); i += BATCH_SIZE) { | ||
| List<String> fileBatch = validFiles.subList(i, Math.min(i + BATCH_SIZE, validFiles.size())); | ||
| String taskName = cleanupTask.getName() + "_batch" + i + "_" + UUID.randomUUID(); | ||
| LOGGER | ||
| .atDebug() | ||
| .addKeyValue("taskName", taskName) | ||
| .addKeyValue("tableIdentifier", tableEntity.getTableIdentifier()) | ||
| .addKeyValue("fileBatch", fileBatch.toString()) | ||
| .log("Queueing task to delete a batch of " + fileType.getTypeName()); | ||
|  | ||
| TaskEntity batchTask = | ||
| new TaskEntity.Builder() | ||
| .setName(taskName) | ||
| .setId(metaStoreManager.generateNewEntityId(polarisCallContext).getId()) | ||
| .setCreateTimestamp(polarisCallContext.getClock().millis()) | ||
| .withTaskType(AsyncTaskType.TABLE_CONTENT_CLEANUP) | ||
| .withData( | ||
| new TableContentCleanupTaskHandler.TableContentCleanupTask( | ||
| tableEntity.getTableIdentifier(), fileBatch)) | ||
| .setInternalProperties(cleanupTask.getInternalPropertiesAsMap()) | ||
| .build(); | ||
|  | ||
| List<PolarisBaseEntity> createdTasks = | ||
| metaStoreManager | ||
| .createEntitiesIfNotExist(polarisCallContext, null, List.of(batchTask)) | ||
| .getEntities(); | ||
|  | ||
| if (createdTasks != null) { | ||
| LOGGER | ||
| .atInfo() | ||
| .addKeyValue("tableIdentifier", tableEntity.getTableIdentifier()) | ||
| .addKeyValue("taskCount", createdTasks.size()) | ||
| .addKeyValue("fileBatch", fileBatch.toString()) | ||
| .log("Successfully queued task to delete a batch of " + fileType.getTypeName() + "s"); | ||
|  | ||
| for (PolarisBaseEntity createdTask : createdTasks) { | ||
| taskExecutor.addTaskHandlerContext(createdTask.getId(), CallContext.getCurrentContext()); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|  | ||
| private enum CleanupTableContentFileType { | ||
| PREV_METADATA("previous metadata file"), | ||
| STATISTICS("statistics file"), | ||
| ; | ||
|  | ||
| private final String typeName; | ||
|  | ||
| CleanupTableContentFileType(String typeName) { | ||
| this.typeName = typeName; | ||
| } | ||
|  | ||
| public String getTypeName() { | ||
| return typeName; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,189 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.polaris.service.task; | ||
|  | ||
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.function.Function; | ||
| import org.apache.iceberg.catalog.TableIdentifier; | ||
| import org.apache.iceberg.io.FileIO; | ||
| import org.apache.polaris.core.entity.AsyncTaskType; | ||
| import org.apache.polaris.core.entity.TaskEntity; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|  | ||
| /** | ||
| * {@link TaskHandler} responsible for deleting previous metadata and statistics files of a table. | ||
| */ | ||
| public class TableContentCleanupTaskHandler implements TaskHandler { | ||
|          | ||
| public static final int MAX_ATTEMPTS = 3; | ||
| public static final int FILE_DELETION_RETRY_MILLIS = 100; | ||
| private static final Logger LOGGER = | ||
| LoggerFactory.getLogger(TableContentCleanupTaskHandler.class); | ||
| private final Function<TaskEntity, FileIO> fileIOSupplier; | ||
| private final ExecutorService executorService; | ||
|  | ||
| public TableContentCleanupTaskHandler( | ||
| Function<TaskEntity, FileIO> fileIOSupplier, ExecutorService executorService) { | ||
| this.fileIOSupplier = fileIOSupplier; | ||
| this.executorService = executorService; | ||
| } | ||
|  | ||
| @Override | ||
| public boolean canHandleTask(TaskEntity task) { | ||
| return task.getTaskType() == AsyncTaskType.TABLE_CONTENT_CLEANUP; | ||
| } | ||
|  | ||
| @Override | ||
| public boolean handleTask(TaskEntity task) { | ||
| TableContentCleanupTask cleanupTask = task.readData(TableContentCleanupTask.class); | ||
| List<String> fileBatch = cleanupTask.getFileBatch(); | ||
| TableIdentifier tableId = cleanupTask.getTableId(); | ||
| try (FileIO authorizedFileIO = fileIOSupplier.apply(task)) { | ||
| List<String> validFiles = | ||
| fileBatch.stream().filter(file -> TaskUtils.exists(file, authorizedFileIO)).toList(); | ||
| if (validFiles.isEmpty()) { | ||
| LOGGER | ||
| .atWarn() | ||
| .addKeyValue("taskName", task.getName()) | ||
| .addKeyValue("fileBatch", fileBatch.toString()) | ||
| .addKeyValue("tableId", tableId) | ||
| .log("Table content cleanup task scheduled, but the none of the file in batch exists"); | ||
| return true; | ||
| } | ||
|  | ||
| // Schedule the deletion for each file asynchronously | ||
| List<CompletableFuture<Void>> deleteFutures = | ||
| validFiles.stream() | ||
| .map(file -> tryDelete(tableId, authorizedFileIO, file, null, 1)) | ||
| .toList(); | ||
|  | ||
| // Wait for all delete operations to finish | ||
| CompletableFuture<Void> allDeletes = | ||
| CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])); | ||
| allDeletes.join(); | ||
|  | ||
| LOGGER | ||
| .atInfo() | ||
| .addKeyValue("taskName", task.getName()) | ||
| .addKeyValue("fileBatch", fileBatch.toString()) | ||
| .addKeyValue("tableId", tableId) | ||
| .log("All the files in task have been deleted"); | ||
|  | ||
| return true; | ||
| } catch (Exception e) { | ||
| LOGGER.error("Error during table content cleanup for file batch {}", fileBatch.toString(), e); | ||
| return false; | ||
| } | ||
| } | ||
|  | ||
| private CompletableFuture<Void> tryDelete( | ||
| TableIdentifier tableId, FileIO fileIO, String filePath, Throwable e, int attempt) { | ||
| if (e != null && attempt <= MAX_ATTEMPTS) { | ||
| LOGGER | ||
| .atWarn() | ||
| .addKeyValue("filePath", filePath) | ||
| .addKeyValue("attempt", attempt) | ||
| .addKeyValue("error", e.getMessage()) | ||
| .log("Error encountered attempting to delete file"); | ||
| } | ||
|  | ||
| if (attempt > MAX_ATTEMPTS && e != null) { | ||
| return CompletableFuture.failedFuture(e); | ||
| } | ||
|  | ||
| return CompletableFuture.runAsync( | ||
| () -> { | ||
| if (TaskUtils.exists(filePath, fileIO)) { | ||
| fileIO.deleteFile(filePath); | ||
| LOGGER | ||
| .atInfo() | ||
| .addKeyValue("filePath", filePath) | ||
| .addKeyValue("tableId", tableId) | ||
| .addKeyValue("attempt", attempt) | ||
| .log("Successfully deleted file {}", filePath); | ||
| } else { | ||
| LOGGER | ||
| .atInfo() | ||
| .addKeyValue("filePath", filePath) | ||
| .addKeyValue("tableId", tableId) | ||
| .log("File doesn't exist, likely already deleted"); | ||
| } | ||
| }, | ||
| executorService) | ||
| .exceptionallyComposeAsync( | ||
| newEx -> { | ||
| LOGGER | ||
| .atWarn() | ||
| .addKeyValue("filePath", filePath) | ||
| .addKeyValue("tableId", tableId) | ||
| .log("Exception caught deleting table content file", newEx); | ||
| return tryDelete(tableId, fileIO, filePath, newEx, attempt + 1); | ||
| }, | ||
| CompletableFuture.delayedExecutor( | ||
| FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService)); | ||
| } | ||
|  | ||
| public static final class TableContentCleanupTask { | ||
| private TableIdentifier tableId; | ||
| private List<String> fileBatch; | ||
|  | ||
| public TableContentCleanupTask() {} | ||
|  | ||
| public TableContentCleanupTask(TableIdentifier tableId, List<String> fileBatch) { | ||
| this.tableId = tableId; | ||
| this.fileBatch = fileBatch; | ||
| } | ||
|  | ||
| public TableIdentifier getTableId() { | ||
| return tableId; | ||
| } | ||
|  | ||
| public void setTableId(TableIdentifier tableId) { | ||
| this.tableId = tableId; | ||
| } | ||
|  | ||
| public List<String> getFileBatch() { | ||
| return fileBatch; | ||
| } | ||
|  | ||
| public void setFileBatch(List<String> fileBatch) { | ||
| this.fileBatch = fileBatch; | ||
| } | ||
|  | ||
| @Override | ||
| public boolean equals(Object object) { | ||
| if (this == object) { | ||
| return true; | ||
| } | ||
| if (!(object instanceof TableContentCleanupTask other)) { | ||
| return false; | ||
| } | ||
| return Objects.equals(tableId, other.tableId) && Objects.equals(fileBatch, other.fileBatch); | ||
| } | ||
|  | ||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(tableId, fileBatch.toString()); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
FILE_CLEANUPtype was poorly named. We should make it specific, as only theManifestFileCleanupTaskHandlerdeals with it. That said, the actual serialized value is only the integer (see https://github.com/collado-mike/polaris/blob/8cb6b44bf57dc597dab612d109d3eb534aef5715/polaris-core/src/main/java/org/apache/polaris/core/entity/AsyncTaskType.java#L34-L37 ), so we can rename it and maintain backward compatibility.With that in mind, I think we should rename
FILE_CLEANUP->MANIFEST_FILE_CLEANUPand add two enums for the new file types:METADATA_LOG_ENTRY_CLEANUPandSTATISTICS_FILE_CLEANUP. Your task handler can look for instances of both types. I think that gives us flexibility in the future if we need to handle the different file types differently.