Skip to content

Commit f3686cb

Browse files
committed
Register VACUUM operations in the delta log
1 parent dd2d20e commit f3686cb

File tree

5 files changed

+199
-18
lines changed

5 files changed

+199
-18
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public class DeltaLakeConfig
7575
private long defaultCheckpointWritingInterval = 10;
7676
private boolean checkpointFilteringEnabled = true;
7777
private Duration vacuumMinRetention = new Duration(7, DAYS);
78+
private boolean vacuumTransactionLoggingEnabled;
7879
private Optional<String> hiveCatalogName = Optional.empty();
7980
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
8081
private boolean tableStatisticsEnabled = true;
@@ -298,6 +299,19 @@ public DeltaLakeConfig setVacuumMinRetention(Duration vacuumMinRetention)
298299
return this;
299300
}
300301

302+
public boolean isVacuumTransactionLoggingEnabled()
303+
{
304+
return vacuumTransactionLoggingEnabled;
305+
}
306+
307+
@Config("delta.vacuum.transaction-logging.enabled")
308+
@ConfigDescription("Whether to log vacuum information into the Delta transaction log")
309+
public DeltaLakeConfig setVacuumTransactionLoggingEnabled(boolean vacuumTransactionLoggingEnabled)
310+
{
311+
this.vacuumTransactionLoggingEnabled = vacuumTransactionLoggingEnabled;
312+
return this;
313+
}
314+
301315
public Optional<String> getHiveCatalogName()
302316
{
303317
return hiveCatalogName;

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public final class DeltaLakeSessionProperties
5252
{
5353
public static final String MAX_SPLIT_SIZE = "max_split_size";
5454
public static final String VACUUM_MIN_RETENTION = "vacuum_min_retention";
55+
public static final String VACUUM_TRANSACTION_LOGGING_ENABLED = "vacuum_transaction_logging_enabled";
5556
private static final String HIVE_CATALOG_NAME = "hive_catalog_name";
5657
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
5758
private static final String PARQUET_MAX_READ_BLOCK_ROW_COUNT = "parquet_max_read_block_row_count";
@@ -96,6 +97,11 @@ public DeltaLakeSessionProperties(
9697
"Minimal retention period for vacuum procedure",
9798
deltaLakeConfig.getVacuumMinRetention(),
9899
false),
100+
booleanProperty(
101+
VACUUM_TRANSACTION_LOGGING_ENABLED,
102+
"Vacuum transaction logging enabled",
103+
deltaLakeConfig.isVacuumTransactionLoggingEnabled(),
104+
false),
99105
stringProperty(
100106
HIVE_CATALOG_NAME,
101107
"Catalog to redirect to when a Hive table is referenced",
@@ -255,6 +261,11 @@ public static Duration getVacuumMinRetention(ConnectorSession session)
255261
return session.getProperty(VACUUM_MIN_RETENTION, Duration.class);
256262
}
257263

264+
public static boolean isVacuumLoggingEnabled(ConnectorSession session)
265+
{
266+
return session.getProperty(VACUUM_TRANSACTION_LOGGING_ENABLED, Boolean.class);
267+
}
268+
258269
public static Optional<String> getHiveCatalogName(ConnectorSession session)
259270
{
260271
return Optional.ofNullable(session.getProperty(HIVE_CATALOG_NAME, String.class));

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java

Lines changed: 123 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.trino.plugin.deltalake.procedure;
1515

1616
import com.google.common.collect.ImmutableList;
17+
import com.google.common.collect.ImmutableMap;
1718
import com.google.common.collect.ImmutableSet;
1819
import com.google.inject.Inject;
1920
import com.google.inject.Provider;
@@ -32,11 +33,15 @@
3233
import io.trino.plugin.deltalake.DeltaLakeSessionProperties;
3334
import io.trino.plugin.deltalake.DeltaLakeTableHandle;
3435
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
36+
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
3537
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
3638
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
3739
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
3840
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
3941
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
42+
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter;
43+
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory;
44+
import io.trino.spi.NodeManager;
4045
import io.trino.spi.TrinoException;
4146
import io.trino.spi.catalog.CatalogName;
4247
import io.trino.spi.classloader.ThreadContextClassLoader;
@@ -52,7 +57,9 @@
5257
import java.lang.invoke.MethodHandle;
5358
import java.time.Instant;
5459
import java.util.ArrayList;
60+
import java.util.HashSet;
5561
import java.util.List;
62+
import java.util.Map;
5663
import java.util.Objects;
5764
import java.util.Optional;
5865
import java.util.Set;
@@ -62,12 +69,15 @@
6269
import static com.google.common.base.Predicates.alwaysFalse;
6370
import static com.google.common.collect.ImmutableList.toImmutableList;
6471
import static com.google.common.collect.ImmutableSet.toImmutableSet;
72+
import static io.trino.filesystem.Locations.areDirectoryLocationsEquivalent;
6573
import static io.trino.plugin.base.util.Procedures.checkProcedureArgument;
6674
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR;
6775
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION;
6876
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkUnsupportedUniversalFormat;
6977
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle;
7078
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention;
79+
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isVacuumLoggingEnabled;
80+
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.IsolationLevel;
7181
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.DELETION_VECTORS_FEATURE_NAME;
7282
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.unsupportedWriterFeatures;
7383
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
@@ -100,18 +110,26 @@ public class VacuumProcedure
100110
private final TrinoFileSystemFactory fileSystemFactory;
101111
private final DeltaLakeMetadataFactory metadataFactory;
102112
private final TransactionLogAccess transactionLogAccess;
113+
private final TransactionLogWriterFactory transactionLogWriterFactory;
114+
private final String nodeVersion;
115+
private final String nodeId;
103116

104117
@Inject
105118
public VacuumProcedure(
106119
CatalogName catalogName,
107120
TrinoFileSystemFactory fileSystemFactory,
108121
DeltaLakeMetadataFactory metadataFactory,
109-
TransactionLogAccess transactionLogAccess)
122+
TransactionLogAccess transactionLogAccess,
123+
TransactionLogWriterFactory transactionLogWriterFactory,
124+
NodeManager nodeManager)
110125
{
111126
this.catalogName = requireNonNull(catalogName, "catalogName is null");
112127
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
113128
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
114129
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
130+
this.transactionLogWriterFactory = requireNonNull(transactionLogWriterFactory, "transactionLogWriterFactory is null");
131+
this.nodeVersion = nodeManager.getCurrentNode().getVersion();
132+
this.nodeId = nodeManager.getCurrentNode().getNodeIdentifier();
115133
}
116134

117135
@Override
@@ -166,6 +184,7 @@ private void doVacuum(
166184

167185
Duration retentionDuration = Duration.valueOf(retention);
168186
Duration minRetention = getVacuumMinRetention(session);
187+
boolean isVacuumLoggingEnabled = isVacuumLoggingEnabled(session);
169188
checkProcedureArgument(
170189
retentionDuration.compareTo(minRetention) >= 0,
171190
"Retention specified (%s) is shorter than the minimum retention configured in the system (%s). " +
@@ -206,10 +225,10 @@ private void doVacuum(
206225
}
207226

208227
TableSnapshot tableSnapshot = metadata.getSnapshot(session, tableName, handle.getLocation(), Optional.of(handle.getReadVersion()));
209-
String tableLocation = tableSnapshot.getTableLocation();
210-
String transactionLogDir = getTransactionLogDir(tableLocation);
228+
String tableLocationString = tableSnapshot.getTableLocation();
229+
String transactionLogDir = getTransactionLogDir(tableLocationString);
211230
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
212-
String commonPathPrefix = tableLocation.endsWith("/") ? tableLocation : tableLocation + "/";
231+
String commonPathPrefix = tableLocationString.endsWith("/") ? tableLocationString : tableLocationString + "/";
213232
String queryId = session.getQueryId();
214233

215234
// Retain all active files and every file removed by a "recent" transaction (except for the oldest "recent").
@@ -240,7 +259,7 @@ private void doVacuum(
240259
.filter(Objects::nonNull)
241260
.map(RemoveFileEntry::path))) {
242261
retainedPaths = pathEntries
243-
.peek(path -> checkState(!path.startsWith(tableLocation), "Unexpected absolute path in transaction log: %s", path))
262+
.peek(path -> checkState(!path.startsWith(tableLocationString), "Unexpected absolute path in transaction log: %s", path))
244263
.collect(toImmutableSet());
245264
}
246265
}
@@ -249,34 +268,44 @@ private void doVacuum(
249268
"[%s] attempting to vacuum table %s [%s] with %s retention (expiry threshold %s). %s data file paths marked for retention",
250269
queryId,
251270
tableName,
252-
tableLocation,
271+
tableLocationString,
253272
retention,
254273
threshold,
255274
retainedPaths.size());
256275

276+
Location tableLocation = Location.of(tableLocationString);
257277
long allPathsChecked = 0;
258278
long transactionLogFiles = 0;
259279
long retainedKnownFiles = 0;
260280
long retainedUnknownFiles = 0;
261281
List<TrinoInputFile> filesToDelete = new ArrayList<>();
282+
Set<String> vacuumedDirectories = new HashSet<>();
283+
vacuumedDirectories.add(tableLocation.path());
284+
long filesToDeleteSize = 0;
262285

263-
FileIterator listing = fileSystem.listFiles(Location.of(tableLocation));
286+
FileIterator listing = fileSystem.listFiles(tableLocation);
264287
while (listing.hasNext()) {
265288
FileEntry entry = listing.next();
289+
266290
String location = entry.location().toString();
267291
checkState(
268292
location.startsWith(commonPathPrefix),
269293
"Unexpected path [%s] returned when listing files under [%s]",
270294
location,
271-
tableLocation);
295+
tableLocationString);
272296
String relativePath = location.substring(commonPathPrefix.length());
273297
if (relativePath.isEmpty()) {
274-
// A file returned for "tableLocation/", might be possible on S3.
298+
// A file returned for "tableLocationString/", might be possible on S3.
275299
continue;
276300
}
277301
allPathsChecked++;
302+
Location parentDirectory = entry.location().parentDirectory();
303+
while (!areDirectoryLocationsEquivalent(parentDirectory, tableLocation)) {
304+
vacuumedDirectories.add(parentDirectory.path());
305+
parentDirectory = parentDirectory.parentDirectory();
306+
}
278307

279-
// ignore tableLocation/_delta_log/**
308+
// ignore tableLocationString/_delta_log/**
280309
if (relativePath.equals(TRANSACTION_LOG_DIRECTORY) || relativePath.startsWith(TRANSACTION_LOG_DIRECTORY + "/")) {
281310
log.debug("[%s] skipping a file inside transaction log dir: %s", queryId, location);
282311
transactionLogFiles++;
@@ -302,27 +331,104 @@ private void doVacuum(
302331
Location fileLocation = Location.of(location);
303332
TrinoInputFile inputFile = fileSystem.newInputFile(fileLocation);
304333
filesToDelete.add(inputFile);
334+
filesToDeleteSize += inputFile.length();
335+
}
336+
long readVersion = handle.getReadVersion();
337+
if (isVacuumLoggingEnabled) {
338+
logVacuumStart(session, handle.location(), readVersion, filesToDelete.size(), filesToDeleteSize);
305339
}
306340
int totalFilesToDelete = filesToDelete.size();
307341
int batchCount = (int) Math.ceil((double) totalFilesToDelete / DELETE_BATCH_SIZE);
308-
for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) {
309-
int start = batchNumber * DELETE_BATCH_SIZE;
310-
int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete);
342+
try {
343+
for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) {
344+
int start = batchNumber * DELETE_BATCH_SIZE;
345+
int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete);
311346

312-
List<TrinoInputFile> batch = filesToDelete.subList(start, end);
313-
fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList()));
347+
List<TrinoInputFile> batch = filesToDelete.subList(start, end);
348+
fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList()));
349+
}
350+
}
351+
catch (IOException e) {
352+
if (isVacuumLoggingEnabled) {
353+
// This mimics Delta behaviour where it sets metrics to 0 in case of a failure
354+
logVacuumEnd(session, handle.location(), readVersion, 0, 0, "FAILED");
355+
}
356+
throw e;
357+
}
358+
if (isVacuumLoggingEnabled) {
359+
logVacuumEnd(session, handle.location(), readVersion, filesToDelete.size(), vacuumedDirectories.size(), "COMPLETED");
314360
}
315-
316361
log.info(
317362
"[%s] finished vacuuming table %s [%s]: files checked: %s; metadata files: %s; retained known files: %s; retained unknown files: %s; removed files: %s",
318363
queryId,
319364
tableName,
320-
tableLocation,
365+
tableLocationString,
321366
allPathsChecked,
322367
transactionLogFiles,
323368
retainedKnownFiles,
324369
retainedUnknownFiles,
325370
totalFilesToDelete);
326371
}
327372
}
373+
374+
private void logVacuumStart(ConnectorSession session, String location, long readVersion, long numFilesToDelete, long filesToDeleteSize)
375+
throws IOException
376+
{
377+
long createdTime = System.currentTimeMillis();
378+
long commitVersion = readVersion + 1;
379+
380+
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location);
381+
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(
382+
session,
383+
commitVersion,
384+
createdTime,
385+
"VACUUM START",
386+
ImmutableMap.of("queryId", session.getQueryId()),
387+
ImmutableMap.of("numFilesToDelete", String.valueOf(numFilesToDelete), "sizeOfDataToDelete", String.valueOf(filesToDeleteSize)),
388+
readVersion));
389+
transactionLogWriter.flush();
390+
}
391+
392+
private void logVacuumEnd(ConnectorSession session, String location, long readVersion, int numDeletedFiles, int numVacuumedDirectories, String status)
393+
throws IOException
394+
{
395+
long createdTime = System.currentTimeMillis();
396+
long commitVersion = readVersion + 2;
397+
398+
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location);
399+
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(
400+
session,
401+
commitVersion,
402+
createdTime,
403+
"VACUUM END",
404+
ImmutableMap.of("queryId", session.getQueryId(), "status", status),
405+
ImmutableMap.of("numDeletedFiles", String.valueOf(numDeletedFiles), "numVacuumedDirectories", String.valueOf(numVacuumedDirectories)),
406+
readVersion));
407+
transactionLogWriter.flush();
408+
}
409+
410+
private CommitInfoEntry getCommitInfoEntry(
411+
ConnectorSession session,
412+
long commitVersion,
413+
long createdTime,
414+
String operation, Map<String,
415+
String> operationParameters,
416+
Map<String, String> operationMetrics,
417+
long readVersion)
418+
{
419+
return new CommitInfoEntry(
420+
commitVersion,
421+
createdTime,
422+
session.getUser(),
423+
session.getUser(),
424+
operation,
425+
operationParameters,
426+
null,
427+
null,
428+
"trino-" + nodeVersion + "-" + nodeId,
429+
readVersion,
430+
IsolationLevel.WRITESERIALIZABLE.getValue(),
431+
Optional.of(true),
432+
operationMetrics);
433+
}
328434
}

0 commit comments

Comments
 (0)