diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 1d553252dfe8..ecc96ea5c2ee 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -75,6 +75,7 @@ public class DeltaLakeConfig private long defaultCheckpointWritingInterval = 10; private boolean checkpointFilteringEnabled = true; private Duration vacuumMinRetention = new Duration(7, DAYS); + private boolean vacuumTransactionLoggingEnabled; private Optional hiveCatalogName = Optional.empty(); private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS); private boolean tableStatisticsEnabled = true; @@ -298,6 +299,19 @@ public DeltaLakeConfig setVacuumMinRetention(Duration vacuumMinRetention) return this; } + public boolean isVacuumTransactionLoggingEnabled() + { + return vacuumTransactionLoggingEnabled; + } + + @Config("delta.vacuum.transaction-logging.enabled") + @ConfigDescription("Whether to log vacuum information into the Delta transaction log") + public DeltaLakeConfig setVacuumTransactionLoggingEnabled(boolean vacuumTransactionLoggingEnabled) + { + this.vacuumTransactionLoggingEnabled = vacuumTransactionLoggingEnabled; + return this; + } + public Optional getHiveCatalogName() { return hiveCatalogName; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java index 01e744dc3047..58cb23066a28 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java @@ -52,6 +52,7 @@ public final class DeltaLakeSessionProperties { public static final String MAX_SPLIT_SIZE = "max_split_size"; public static final String VACUUM_MIN_RETENTION = "vacuum_min_retention"; + public static final String VACUUM_TRANSACTION_LOGGING_ENABLED = "vacuum_transaction_logging_enabled"; private static final String HIVE_CATALOG_NAME = "hive_catalog_name"; private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size"; private static final String PARQUET_MAX_READ_BLOCK_ROW_COUNT = "parquet_max_read_block_row_count"; @@ -96,6 +97,11 @@ public DeltaLakeSessionProperties( "Minimal retention period for vacuum procedure", deltaLakeConfig.getVacuumMinRetention(), false), + booleanProperty( + VACUUM_TRANSACTION_LOGGING_ENABLED, + "Vacuum transaction logging enabled", + deltaLakeConfig.isVacuumTransactionLoggingEnabled(), + false), stringProperty( HIVE_CATALOG_NAME, "Catalog to redirect to when a Hive table is referenced", @@ -255,6 +261,11 @@ public static Duration getVacuumMinRetention(ConnectorSession session) return session.getProperty(VACUUM_MIN_RETENTION, Duration.class); } + public static boolean isVacuumLoggingEnabled(ConnectorSession session) + { + return session.getProperty(VACUUM_TRANSACTION_LOGGING_ENABLED, Boolean.class); + } + public static Optional getHiveCatalogName(ConnectorSession session) { return Optional.ofNullable(session.getProperty(HIVE_CATALOG_NAME, String.class)); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java index e84a5d700c29..d21f81986427 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java @@ -14,6 +14,7 @@ package io.trino.plugin.deltalake.procedure; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import com.google.inject.Provider; @@ -24,6 +25,7 @@ import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.TrinoInputFile; import io.trino.plugin.base.util.UncheckedCloseable; import io.trino.plugin.deltalake.DeltaLakeConfig; import io.trino.plugin.deltalake.DeltaLakeMetadata; @@ -31,11 +33,15 @@ import io.trino.plugin.deltalake.DeltaLakeSessionProperties; import io.trino.plugin.deltalake.DeltaLakeTableHandle; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; +import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; +import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter; +import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory; +import io.trino.spi.NodeManager; import io.trino.spi.TrinoException; import io.trino.spi.catalog.CatalogName; import io.trino.spi.classloader.ThreadContextClassLoader; @@ -51,7 +57,9 @@ import java.lang.invoke.MethodHandle; import java.time.Instant; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -61,12 +69,15 @@ import static com.google.common.base.Predicates.alwaysFalse; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.filesystem.Locations.areDirectoryLocationsEquivalent; import static io.trino.plugin.base.util.Procedures.checkProcedureArgument; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR; import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION; import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkUnsupportedUniversalFormat; import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention; +import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isVacuumLoggingEnabled; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.IsolationLevel; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.DELETION_VECTORS_FEATURE_NAME; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.unsupportedWriterFeatures; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; @@ -99,18 +110,26 @@ public class VacuumProcedure private final TrinoFileSystemFactory fileSystemFactory; private final DeltaLakeMetadataFactory metadataFactory; private final TransactionLogAccess transactionLogAccess; + private final TransactionLogWriterFactory transactionLogWriterFactory; + private final String nodeVersion; + private final String nodeId; @Inject public VacuumProcedure( CatalogName catalogName, TrinoFileSystemFactory fileSystemFactory, DeltaLakeMetadataFactory metadataFactory, - TransactionLogAccess transactionLogAccess) + TransactionLogAccess transactionLogAccess, + TransactionLogWriterFactory transactionLogWriterFactory, + NodeManager nodeManager) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null"); this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null"); + this.transactionLogWriterFactory = requireNonNull(transactionLogWriterFactory, "transactionLogWriterFactory is null"); + this.nodeVersion = nodeManager.getCurrentNode().getVersion(); + this.nodeId = nodeManager.getCurrentNode().getNodeIdentifier(); } @Override @@ -165,6 +184,7 @@ private void doVacuum( Duration retentionDuration = Duration.valueOf(retention); Duration minRetention = getVacuumMinRetention(session); + boolean isVacuumLoggingEnabled = isVacuumLoggingEnabled(session); checkProcedureArgument( retentionDuration.compareTo(minRetention) >= 0, "Retention specified (%s) is shorter than the minimum retention configured in the system (%s). " + @@ -205,10 +225,10 @@ private void doVacuum( } TableSnapshot tableSnapshot = metadata.getSnapshot(session, tableName, handle.getLocation(), Optional.of(handle.getReadVersion())); - String tableLocation = tableSnapshot.getTableLocation(); - String transactionLogDir = getTransactionLogDir(tableLocation); + String tableLocationString = tableSnapshot.getTableLocation(); + String transactionLogDir = getTransactionLogDir(tableLocationString); TrinoFileSystem fileSystem = fileSystemFactory.create(session); - String commonPathPrefix = tableLocation.endsWith("/") ? tableLocation : tableLocation + "/"; + String commonPathPrefix = tableLocationString.endsWith("/") ? tableLocationString : tableLocationString + "/"; String queryId = session.getQueryId(); // Retain all active files and every file removed by a "recent" transaction (except for the oldest "recent"). @@ -239,7 +259,7 @@ private void doVacuum( .filter(Objects::nonNull) .map(RemoveFileEntry::path))) { retainedPaths = pathEntries - .peek(path -> checkState(!path.startsWith(tableLocation), "Unexpected absolute path in transaction log: %s", path)) + .peek(path -> checkState(!path.startsWith(tableLocationString), "Unexpected absolute path in transaction log: %s", path)) .collect(toImmutableSet()); } } @@ -248,35 +268,44 @@ private void doVacuum( "[%s] attempting to vacuum table %s [%s] with %s retention (expiry threshold %s). %s data file paths marked for retention", queryId, tableName, - tableLocation, + tableLocationString, retention, threshold, retainedPaths.size()); + Location tableLocation = Location.of(tableLocationString); long allPathsChecked = 0; long transactionLogFiles = 0; long retainedKnownFiles = 0; long retainedUnknownFiles = 0; - long removedFiles = 0; + List filesToDelete = new ArrayList<>(); + Set vacuumedDirectories = new HashSet<>(); + vacuumedDirectories.add(tableLocation.path()); + long filesToDeleteSize = 0; - List filesToDelete = new ArrayList<>(); - FileIterator listing = fileSystem.listFiles(Location.of(tableLocation)); + FileIterator listing = fileSystem.listFiles(tableLocation); while (listing.hasNext()) { FileEntry entry = listing.next(); + String location = entry.location().toString(); checkState( location.startsWith(commonPathPrefix), "Unexpected path [%s] returned when listing files under [%s]", location, - tableLocation); + tableLocationString); String relativePath = location.substring(commonPathPrefix.length()); if (relativePath.isEmpty()) { - // A file returned for "tableLocation/", might be possible on S3. + // A file returned for "tableLocationString/", might be possible on S3. continue; } allPathsChecked++; + Location parentDirectory = entry.location().parentDirectory(); + while (!areDirectoryLocationsEquivalent(parentDirectory, tableLocation)) { + vacuumedDirectories.add(parentDirectory.path()); + parentDirectory = parentDirectory.parentDirectory(); + } - // ignore tableLocation/_delta_log/** + // ignore tableLocationString/_delta_log/** if (relativePath.equals(TRANSACTION_LOG_DIRECTORY) || relativePath.startsWith(TRANSACTION_LOG_DIRECTORY + "/")) { log.debug("[%s] skipping a file inside transaction log dir: %s", queryId, location); transactionLogFiles++; @@ -297,31 +326,109 @@ private void doVacuum( retainedUnknownFiles++; continue; } - log.debug("[%s] deleting file [%s] with modification time %s", queryId, location, modificationTime); - filesToDelete.add(entry.location()); - if (filesToDelete.size() == DELETE_BATCH_SIZE) { - fileSystem.deleteFiles(filesToDelete); - removedFiles += filesToDelete.size(); - filesToDelete.clear(); - } - } - if (!filesToDelete.isEmpty()) { - fileSystem.deleteFiles(filesToDelete); - removedFiles += filesToDelete.size(); + Location fileLocation = Location.of(location); + TrinoInputFile inputFile = fileSystem.newInputFile(fileLocation); + filesToDelete.add(inputFile); + filesToDeleteSize += inputFile.length(); + } + long readVersion = handle.getReadVersion(); + if (isVacuumLoggingEnabled) { + logVacuumStart(session, handle.location(), readVersion, filesToDelete.size(), filesToDeleteSize); } + int totalFilesToDelete = filesToDelete.size(); + int batchCount = (int) Math.ceil((double) totalFilesToDelete / DELETE_BATCH_SIZE); + try { + for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) { + int start = batchNumber * DELETE_BATCH_SIZE; + int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete); + List batch = filesToDelete.subList(start, end); + fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList())); + } + } + catch (IOException e) { + if (isVacuumLoggingEnabled) { + // This mimics Delta behaviour where it sets metrics to 0 in case of a failure + logVacuumEnd(session, handle.location(), readVersion, 0, 0, "FAILED"); + } + throw e; + } + if (isVacuumLoggingEnabled) { + logVacuumEnd(session, handle.location(), readVersion, filesToDelete.size(), vacuumedDirectories.size(), "COMPLETED"); + } log.info( "[%s] finished vacuuming table %s [%s]: files checked: %s; metadata files: %s; retained known files: %s; retained unknown files: %s; removed files: %s", queryId, tableName, - tableLocation, + tableLocationString, allPathsChecked, transactionLogFiles, retainedKnownFiles, retainedUnknownFiles, - removedFiles); + totalFilesToDelete); } } + + private void logVacuumStart(ConnectorSession session, String location, long readVersion, long numFilesToDelete, long filesToDeleteSize) + throws IOException + { + long createdTime = System.currentTimeMillis(); + long commitVersion = readVersion + 1; + + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location); + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry( + session, + commitVersion, + createdTime, + "VACUUM START", + ImmutableMap.of("queryId", session.getQueryId()), + ImmutableMap.of("numFilesToDelete", String.valueOf(numFilesToDelete), "sizeOfDataToDelete", String.valueOf(filesToDeleteSize)), + readVersion)); + transactionLogWriter.flush(); + } + + private void logVacuumEnd(ConnectorSession session, String location, long readVersion, int numDeletedFiles, int numVacuumedDirectories, String status) + throws IOException + { + long createdTime = System.currentTimeMillis(); + long commitVersion = readVersion + 2; + + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location); + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry( + session, + commitVersion, + createdTime, + "VACUUM END", + ImmutableMap.of("queryId", session.getQueryId(), "status", status), + ImmutableMap.of("numDeletedFiles", String.valueOf(numDeletedFiles), "numVacuumedDirectories", String.valueOf(numVacuumedDirectories)), + readVersion)); + transactionLogWriter.flush(); + } + + private CommitInfoEntry getCommitInfoEntry( + ConnectorSession session, + long commitVersion, + long createdTime, + String operation, Map operationParameters, + Map operationMetrics, + long readVersion) + { + return new CommitInfoEntry( + commitVersion, + createdTime, + session.getUser(), + session.getUser(), + operation, + operationParameters, + null, + null, + "trino-" + nodeVersion + "-" + nodeId, + readVersion, + IsolationLevel.WRITESERIALIZABLE.getValue(), + Optional.of(true), + operationMetrics); + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index b143ce622d3a..d9e860f035c4 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -230,6 +230,7 @@ private QueryRunner createDeltaLakeQueryRunner() .put("hive.metastore-cache-ttl", TEST_METADATA_CACHE_TTL_SECONDS + "s") .put("delta.register-table-procedure.enabled", "true") .put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout + .put("delta.vacuum.transaction-logging.enabled", "true") .putAll(deltaStorageConfiguration()) .buildOrThrow()) .setSchemaLocation(getLocationForTable(bucketName, SCHEMA)) @@ -1709,6 +1710,19 @@ public void testStatsSplitPruningBasedOnSepCreatedCheckpointOnTopOfCheckpointWit @Test public void testVacuum() throws Exception + { + testVacuumTransactionLoggingOnCheckpoint(false); + } + + @Test + public void testVacuumWithTransactionLoggingOnCheckpoint() + throws Exception + { + testVacuumTransactionLoggingOnCheckpoint(true); + } + + private void testVacuumTransactionLoggingOnCheckpoint(boolean isCheckPointInterval) + throws InterruptedException { String catalog = getSession().getCatalog().orElseThrow(); String tableName = "test_vacuum" + randomNameSuffix(); @@ -1716,8 +1730,11 @@ public void testVacuum() Session sessionWithShortRetentionUnlocked = Session.builder(getSession()) .setCatalogSessionProperty(catalog, "vacuum_min_retention", "0s") .build(); + Session sessionWithVacuumLoggingDisabled = Session.builder(sessionWithShortRetentionUnlocked) + .setCatalogSessionProperty(catalog, "vacuum_transaction_logging_enabled", "false") + .build(); assertUpdate( - format("CREATE TABLE %s WITH (location = '%s', partitioned_by = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", tableName, tableLocation), + format("CREATE TABLE %s WITH (location = '%s', partitioned_by = ARRAY['regionkey']%s) AS SELECT * FROM tpch.tiny.nation", tableName, tableLocation, isCheckPointInterval ? "" : ", checkpoint_interval = 1"), 25); try { Set initialFiles = getActiveFiles(tableName); @@ -1746,6 +1763,36 @@ public void testVacuum() assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles); // old files should be cleaned up assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(updatedFiles); + // operations should be logged + assertThat(query("SELECT version, operation, MAP_FILTER(operation_parameters, (k, v) -> k <> 'queryId') FROM \"" + tableName + "$history\"")).matches( + """ + VALUES + (CAST(0 AS BIGINT), CAST('CREATE TABLE AS SELECT' AS VARCHAR), CAST(MAP() AS MAP(VARCHAR, VARCHAR))), + (1, 'MERGE', MAP()), + (2, 'VACUUM START', MAP()), + (3, 'VACUUM END', MAP(ARRAY['status'], ARRAY['COMPLETED'])), + (4, 'VACUUM START', MAP()), + (5, 'VACUUM END', MAP(ARRAY['status'], ARRAY['COMPLETED']))"""); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['sizeOfDataToDelete'] FROM \"" + tableName + "$history\" WHERE version = 2"))).isEqualTo(0); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['numFilesToDelete'] FROM \"" + tableName + "$history\" WHERE version = 2"))).isEqualTo(0); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['numDeletedFiles'] FROM \"" + tableName + "$history\" WHERE version = 3"))).isEqualTo(0); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['numVacuumedDirectories'] FROM \"" + tableName + "$history\" WHERE version = 3"))).isEqualTo(8); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['sizeOfDataToDelete'] FROM \"" + tableName + "$history\" WHERE version = 4"))).isGreaterThanOrEqualTo(4617); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['numFilesToDelete'] FROM \"" + tableName + "$history\" WHERE version = 4"))).isEqualTo(5); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['numDeletedFiles'] FROM \"" + tableName + "$history\" WHERE version = 5"))).isEqualTo(5); + assertThat(Integer.parseInt((String) computeScalar("SELECT operation_metrics['numVacuumedDirectories'] FROM \"" + tableName + "$history\" WHERE version = 5"))).isEqualTo(8); + + assertUpdate(sessionWithVacuumLoggingDisabled, "CALL system.vacuum(schema_name => CURRENT_SCHEMA, table_name => '" + tableName + "', retention => '1s')"); + // no new vacuum logging operations are logged + assertQuery("SELECT version, operation FROM \"" + tableName + "$history\"", + """ + VALUES + (0, 'CREATE TABLE AS SELECT'), + (1, 'MERGE'), + (2, 'VACUUM START'), + (3, 'VACUUM END'), + (4, 'VACUUM START'), + (5, 'VACUUM END')"""); } finally { assertUpdate("DROP TABLE " + tableName); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index fb4555c6fb1d..c26d8e475a1c 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -56,6 +56,7 @@ public void testDefaults() .setCheckpointFilteringEnabled(true) .setCheckpointRowStatisticsWritingEnabled(true) .setVacuumMinRetention(new Duration(7, DAYS)) + .setVacuumTransactionLoggingEnabled(false) .setHiveCatalogName(null) .setDynamicFilteringWaitTimeout(new Duration(0, SECONDS)) .setTableStatisticsEnabled(true) @@ -99,6 +100,7 @@ public void testExplicitPropertyMappings() .put("delta.checkpoint-filtering.enabled", "false") .put("delta.checkpoint-row-statistics-writing.enabled", "false") .put("delta.vacuum.min-retention", "13h") + .put("delta.vacuum.transaction-logging.enabled", "true") .put("delta.hive-catalog-name", "hive") .put("delta.dynamic-filtering.wait-timeout", "30m") .put("delta.table-statistics-enabled", "false") @@ -139,6 +141,7 @@ public void testExplicitPropertyMappings() .setCheckpointRowStatisticsWritingEnabled(false) .setCheckpointFilteringEnabled(false) .setVacuumMinRetention(new Duration(13, HOURS)) + .setVacuumTransactionLoggingEnabled(true) .setHiveCatalogName("hive") .setDynamicFilteringWaitTimeout(new Duration(30, MINUTES)) .setTableStatisticsEnabled(false)