Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class DeltaLakeConfig
private long defaultCheckpointWritingInterval = 10;
private boolean checkpointFilteringEnabled = true;
private Duration vacuumMinRetention = new Duration(7, DAYS);
private boolean vacuumTransactionLoggingEnabled;
private Optional<String> hiveCatalogName = Optional.empty();
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
private boolean tableStatisticsEnabled = true;
Expand Down Expand Up @@ -298,6 +299,19 @@ public DeltaLakeConfig setVacuumMinRetention(Duration vacuumMinRetention)
return this;
}

public boolean isVacuumTransactionLoggingEnabled()
{
return vacuumTransactionLoggingEnabled;
}

@Config("delta.vacuum.transaction-logging.enabled")
@ConfigDescription("Whether to log vacuum information into the Delta transaction log")
public DeltaLakeConfig setVacuumTransactionLoggingEnabled(boolean vacuumTransactionLoggingEnabled)
{
this.vacuumTransactionLoggingEnabled = vacuumTransactionLoggingEnabled;
return this;
}

public Optional<String> getHiveCatalogName()
{
return hiveCatalogName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public final class DeltaLakeSessionProperties
{
public static final String MAX_SPLIT_SIZE = "max_split_size";
public static final String VACUUM_MIN_RETENTION = "vacuum_min_retention";
public static final String VACUUM_TRANSACTION_LOGGING_ENABLED = "vacuum_transaction_logging_enabled";
private static final String HIVE_CATALOG_NAME = "hive_catalog_name";
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_MAX_READ_BLOCK_ROW_COUNT = "parquet_max_read_block_row_count";
Expand Down Expand Up @@ -96,6 +97,11 @@ public DeltaLakeSessionProperties(
"Minimal retention period for vacuum procedure",
deltaLakeConfig.getVacuumMinRetention(),
false),
booleanProperty(
VACUUM_TRANSACTION_LOGGING_ENABLED,
"Vacuum transaction logging enabled",
deltaLakeConfig.isVacuumTransactionLoggingEnabled(),
false),
stringProperty(
HIVE_CATALOG_NAME,
"Catalog to redirect to when a Hive table is referenced",
Expand Down Expand Up @@ -255,6 +261,11 @@ public static Duration getVacuumMinRetention(ConnectorSession session)
return session.getProperty(VACUUM_MIN_RETENTION, Duration.class);
}

public static boolean isVacuumLoggingEnabled(ConnectorSession session)
{
return session.getProperty(VACUUM_TRANSACTION_LOGGING_ENABLED, Boolean.class);
}

public static Optional<String> getHiveCatalogName(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty(HIVE_CATALOG_NAME, String.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake.procedure;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.google.inject.Provider;
Expand All @@ -24,18 +25,23 @@
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
import io.trino.plugin.base.util.UncheckedCloseable;
import io.trino.plugin.deltalake.DeltaLakeConfig;
import io.trino.plugin.deltalake.DeltaLakeMetadata;
import io.trino.plugin.deltalake.DeltaLakeMetadataFactory;
import io.trino.plugin.deltalake.DeltaLakeSessionProperties;
import io.trino.plugin.deltalake.DeltaLakeTableHandle;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory;
import io.trino.spi.NodeManager;
import io.trino.spi.TrinoException;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.classloader.ThreadContextClassLoader;
Expand All @@ -51,7 +57,9 @@
import java.lang.invoke.MethodHandle;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand All @@ -61,12 +69,15 @@
import static com.google.common.base.Predicates.alwaysFalse;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.filesystem.Locations.areDirectoryLocationsEquivalent;
import static io.trino.plugin.base.util.Procedures.checkProcedureArgument;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkUnsupportedUniversalFormat;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isVacuumLoggingEnabled;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.IsolationLevel;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.DELETION_VECTORS_FEATURE_NAME;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.unsupportedWriterFeatures;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
Expand Down Expand Up @@ -99,18 +110,26 @@ public class VacuumProcedure
private final TrinoFileSystemFactory fileSystemFactory;
private final DeltaLakeMetadataFactory metadataFactory;
private final TransactionLogAccess transactionLogAccess;
private final TransactionLogWriterFactory transactionLogWriterFactory;
private final String nodeVersion;
private final String nodeId;

@Inject
public VacuumProcedure(
CatalogName catalogName,
TrinoFileSystemFactory fileSystemFactory,
DeltaLakeMetadataFactory metadataFactory,
TransactionLogAccess transactionLogAccess)
TransactionLogAccess transactionLogAccess,
TransactionLogWriterFactory transactionLogWriterFactory,
NodeManager nodeManager)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
this.transactionLogWriterFactory = requireNonNull(transactionLogWriterFactory, "transactionLogWriterFactory is null");
this.nodeVersion = nodeManager.getCurrentNode().getVersion();
this.nodeId = nodeManager.getCurrentNode().getNodeIdentifier();
}

@Override
Expand Down Expand Up @@ -165,6 +184,7 @@ private void doVacuum(

Duration retentionDuration = Duration.valueOf(retention);
Duration minRetention = getVacuumMinRetention(session);
boolean isVacuumLoggingEnabled = isVacuumLoggingEnabled(session);
checkProcedureArgument(
retentionDuration.compareTo(minRetention) >= 0,
"Retention specified (%s) is shorter than the minimum retention configured in the system (%s). " +
Expand Down Expand Up @@ -205,10 +225,10 @@ private void doVacuum(
}

TableSnapshot tableSnapshot = metadata.getSnapshot(session, tableName, handle.getLocation(), Optional.of(handle.getReadVersion()));
String tableLocation = tableSnapshot.getTableLocation();
String transactionLogDir = getTransactionLogDir(tableLocation);
String tableLocationString = tableSnapshot.getTableLocation();
String transactionLogDir = getTransactionLogDir(tableLocationString);
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
String commonPathPrefix = tableLocation.endsWith("/") ? tableLocation : tableLocation + "/";
String commonPathPrefix = tableLocationString.endsWith("/") ? tableLocationString : tableLocationString + "/";
String queryId = session.getQueryId();

// Retain all active files and every file removed by a "recent" transaction (except for the oldest "recent").
Expand Down Expand Up @@ -239,7 +259,7 @@ private void doVacuum(
.filter(Objects::nonNull)
.map(RemoveFileEntry::path))) {
retainedPaths = pathEntries
.peek(path -> checkState(!path.startsWith(tableLocation), "Unexpected absolute path in transaction log: %s", path))
.peek(path -> checkState(!path.startsWith(tableLocationString), "Unexpected absolute path in transaction log: %s", path))
.collect(toImmutableSet());
}
}
Expand All @@ -248,35 +268,44 @@ private void doVacuum(
"[%s] attempting to vacuum table %s [%s] with %s retention (expiry threshold %s). %s data file paths marked for retention",
queryId,
tableName,
tableLocation,
tableLocationString,
retention,
threshold,
retainedPaths.size());

Location tableLocation = Location.of(tableLocationString);
long allPathsChecked = 0;
long transactionLogFiles = 0;
long retainedKnownFiles = 0;
long retainedUnknownFiles = 0;
long removedFiles = 0;
List<TrinoInputFile> filesToDelete = new ArrayList<>();
Set<String> vacuumedDirectories = new HashSet<>();
vacuumedDirectories.add(tableLocation.path());
long filesToDeleteSize = 0;

List<Location> filesToDelete = new ArrayList<>();
FileIterator listing = fileSystem.listFiles(Location.of(tableLocation));
FileIterator listing = fileSystem.listFiles(tableLocation);
while (listing.hasNext()) {
FileEntry entry = listing.next();

String location = entry.location().toString();
checkState(
location.startsWith(commonPathPrefix),
"Unexpected path [%s] returned when listing files under [%s]",
location,
tableLocation);
tableLocationString);
String relativePath = location.substring(commonPathPrefix.length());
if (relativePath.isEmpty()) {
// A file returned for "tableLocation/", might be possible on S3.
// A file returned for "tableLocationString/", might be possible on S3.
continue;
}
allPathsChecked++;
Location parentDirectory = entry.location().parentDirectory();
while (!areDirectoryLocationsEquivalent(parentDirectory, tableLocation)) {
vacuumedDirectories.add(parentDirectory.path());
parentDirectory = parentDirectory.parentDirectory();
}

// ignore tableLocation/_delta_log/**
// ignore tableLocationString/_delta_log/**
if (relativePath.equals(TRANSACTION_LOG_DIRECTORY) || relativePath.startsWith(TRANSACTION_LOG_DIRECTORY + "/")) {
log.debug("[%s] skipping a file inside transaction log dir: %s", queryId, location);
transactionLogFiles++;
Expand All @@ -297,31 +326,109 @@ private void doVacuum(
retainedUnknownFiles++;
continue;
}

log.debug("[%s] deleting file [%s] with modification time %s", queryId, location, modificationTime);
filesToDelete.add(entry.location());
if (filesToDelete.size() == DELETE_BATCH_SIZE) {
fileSystem.deleteFiles(filesToDelete);
removedFiles += filesToDelete.size();
filesToDelete.clear();
}
}

if (!filesToDelete.isEmpty()) {
fileSystem.deleteFiles(filesToDelete);
removedFiles += filesToDelete.size();
Location fileLocation = Location.of(location);
TrinoInputFile inputFile = fileSystem.newInputFile(fileLocation);
filesToDelete.add(inputFile);
filesToDeleteSize += inputFile.length();
}
long readVersion = handle.getReadVersion();
if (isVacuumLoggingEnabled) {
logVacuumStart(session, handle.location(), readVersion, filesToDelete.size(), filesToDeleteSize);
}
int totalFilesToDelete = filesToDelete.size();
int batchCount = (int) Math.ceil((double) totalFilesToDelete / DELETE_BATCH_SIZE);
try {
for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) {
int start = batchNumber * DELETE_BATCH_SIZE;
int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete);

List<TrinoInputFile> batch = filesToDelete.subList(start, end);
fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList()));
}
}
catch (IOException e) {
if (isVacuumLoggingEnabled) {
// This mimics Delta behaviour where it sets metrics to 0 in case of a failure
logVacuumEnd(session, handle.location(), readVersion, 0, 0, "FAILED");
}
throw e;
}
if (isVacuumLoggingEnabled) {
logVacuumEnd(session, handle.location(), readVersion, filesToDelete.size(), vacuumedDirectories.size(), "COMPLETED");
}
log.info(
"[%s] finished vacuuming table %s [%s]: files checked: %s; metadata files: %s; retained known files: %s; retained unknown files: %s; removed files: %s",
queryId,
tableName,
tableLocation,
tableLocationString,
allPathsChecked,
transactionLogFiles,
retainedKnownFiles,
retainedUnknownFiles,
removedFiles);
totalFilesToDelete);
}
}

private void logVacuumStart(ConnectorSession session, String location, long readVersion, long numFilesToDelete, long filesToDeleteSize)
throws IOException
{
long createdTime = System.currentTimeMillis();
long commitVersion = readVersion + 1;

TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location);
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(
session,
commitVersion,
createdTime,
"VACUUM START",
ImmutableMap.of("queryId", session.getQueryId()),
ImmutableMap.of("numFilesToDelete", String.valueOf(numFilesToDelete), "sizeOfDataToDelete", String.valueOf(filesToDeleteSize)),
readVersion));
transactionLogWriter.flush();
}

private void logVacuumEnd(ConnectorSession session, String location, long readVersion, int numDeletedFiles, int numVacuumedDirectories, String status)
throws IOException
{
long createdTime = System.currentTimeMillis();
long commitVersion = readVersion + 2;

TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location);
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(
session,
commitVersion,
createdTime,
"VACUUM END",
ImmutableMap.of("queryId", session.getQueryId(), "status", status),
ImmutableMap.of("numDeletedFiles", String.valueOf(numDeletedFiles), "numVacuumedDirectories", String.valueOf(numVacuumedDirectories)),
readVersion));
transactionLogWriter.flush();
}

private CommitInfoEntry getCommitInfoEntry(
ConnectorSession session,
long commitVersion,
long createdTime,
String operation, Map<String,
String> operationParameters,
Map<String, String> operationMetrics,
long readVersion)
{
return new CommitInfoEntry(
commitVersion,
createdTime,
session.getUser(),
session.getUser(),
operation,
operationParameters,
null,
null,
"trino-" + nodeVersion + "-" + nodeId,
readVersion,
IsolationLevel.WRITESERIALIZABLE.getValue(),
Optional.of(true),
operationMetrics);
}
}
Loading
Loading