Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28600: Iceberg: Check that table/partition requires compaction b… #5529

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2229,12 +2229,18 @@ public static enum ConfVars {
HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS("hive.iceberg.expire.snapshot.numthreads", 4,
"The number of threads to be used for deleting files during expire snapshot. If set to 0 or below it uses the" +
" defult DirectExecutorService"),

HIVE_ICEBERG_MASK_DEFAULT_LOCATION("hive.iceberg.mask.default.location", false,
"If this is set to true the URI for auth will have the default location masked with DEFAULT_TABLE_LOCATION"),
HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY("hive.iceberg.allow.datafiles.in.table.location.only", false,
"If this is set to true, then all the data files being read should be withing the table location"),

HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD("hive.iceberg.major.compaction.file.size.threshold", "96mb",
new SizeValidator(), "Iceberg data file size in megabytes below which a file needs to be compacted."),
HIVE_ICEBERG_MINOR_COMPACTION_FILE_SIZE_THRESHOLD("hive.iceberg.minor.compaction.file.size.threshold", "16mb",
new SizeValidator(), "Iceberg data file size in megabytes below which a file needs to be compacted."),
ICEBERG_COMPACTION_FILE_SIZE_RATIO("hive.iceberg.compaction.file.size.ratio", 0.1f,
"Ratio of # data files below threshold / # data files above threshold above which compaction is needed"),
ICEBERG_COMPACTION_DELETE_RECORDS_THRESHOLD("hive.iceberg.delete.records.threshold", 100,
"Number of delete records in a table/partition above which a file needs to be compacted."),
HIVE_USE_EXPLICIT_RCFILE_HEADER("hive.exec.rcfile.use.explicit.header", true,
"If this is set the header for RCFiles will simply be RCF. If this is not\n" +
"set the header will be that borrowed from sequence files, e.g. SEQ- followed\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
Expand Down Expand Up @@ -2232,4 +2233,52 @@ private static List<FieldSchema> schema(List<VirtualColumn> exprs) {
private static List<FieldSchema> orderBy(VirtualColumn... exprs) {
return schema(Arrays.asList(exprs));
}

@Override
public boolean canCompact(HiveConf hiveConf, org.apache.hadoop.hive.ql.metadata.Table table, String partitionPath,
CompactionType compactionType) throws HiveException {
Table icebergTable = IcebergTableUtil.getTable(hiveConf, table.getTTable());
return canCompact(hiveConf, icebergTable, partitionPath, compactionType);
}

@VisibleForTesting
boolean canCompact(HiveConf hiveConf, Table icebergTable, String partitionPath,
CompactionType compactionType) throws HiveException {

if (icebergTable.currentSnapshot() == null) {
return false;
}

int deleteRecordsThreshold = HiveConf.getIntVar(hiveConf, ConfVars.ICEBERG_COMPACTION_DELETE_RECORDS_THRESHOLD);
long deleteRecordsCount = IcebergTableUtil.countDeleteRecords(icebergTable, partitionPath);
if (deleteRecordsCount > deleteRecordsThreshold) {
return true;
}

int dataFilesCount = IcebergTableUtil.getDataFiles(icebergTable, partitionPath).size();
if (dataFilesCount < 2) {
return false;
}

long fileSizeInBytesThreshold;
switch (compactionType) {
case MAJOR:
fileSizeInBytesThreshold = HiveConf.getSizeVar(hiveConf,
ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD);
break;
case MINOR:
fileSizeInBytesThreshold = HiveConf.getSizeVar(hiveConf,
ConfVars.HIVE_ICEBERG_MINOR_COMPACTION_FILE_SIZE_THRESHOLD);
break;
default:
throw new HiveException(String.format("Invalid compaction type: %s", compactionType.name()));
}

float fileSizeRatioThreshold = HiveConf.getFloatVar(hiveConf, ConfVars.ICEBERG_COMPACTION_FILE_SIZE_RATIO);
float fileSizeRatio = IcebergTableUtil.getFileSizeRatio(icebergTable, partitionPath, fileSizeInBytesThreshold);
if (fileSizeRatio > fileSizeRatioThreshold) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,52 @@ public static List<DeleteFile> getDeleteFiles(Table table, String partitionPath)
t -> ((PositionDeletesScanTask) t).file()));
}

public static float getFileSizeRatio(Table table, String partitionPath, long fileSizeInBytesThreshold) {
long uncompactedFilesCount = getDataFileCount(table, partitionPath, fileSizeInBytesThreshold, true);
long compactedFilesCount = getDataFileCount(table, partitionPath, fileSizeInBytesThreshold, false);

if (uncompactedFilesCount == 0) {
return 0;
} else if (compactedFilesCount == 0) {
return 1;
} else {
return uncompactedFilesCount * 1.0f / (uncompactedFilesCount + compactedFilesCount);
}
}

private static long getDataFileCount(Table table, String partitionPath, long fileSizeInBytesThreshold,
boolean isLess) {
CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
CloseableIterable<FileScanTask> filteredFileScanTasks =
CloseableIterable.filter(fileScanTasks, t -> {
DataFile file = t.asFileScanTask().file();
return (!table.spec().isPartitioned() ||
partitionPath == null && file.specId() != table.spec().specId() ||
partitionPath != null &&
table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath)) &&
(isLess ? file.fileSizeInBytes() < fileSizeInBytesThreshold :
file.fileSizeInBytes() >= fileSizeInBytesThreshold);
});
return Lists.newArrayList(filteredFileScanTasks).size();
}

public static long countDeleteRecords(Table table, String partitionPath) {
Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
CloseableIterable<ScanTask> deletesScanTasks = deletesTable.newBatchScan().planFiles();
CloseableIterable<ScanTask> filteredDeletesScanTasks =
CloseableIterable.filter(deletesScanTasks, t -> {
DeleteFile file = ((PositionDeletesScanTask) t).file();
return !table.spec().isPartitioned() ||
partitionPath == null && file.specId() != table.spec().specId() ||
partitionPath != null &&
table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath);
});
return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks,
t -> ((PositionDeletesScanTask) t).file().recordCount())).stream().mapToLong(Long::longValue).sum();
}

public static Expression generateExpressionFromPartitionSpec(Table table, Map<String, String> partitionSpec)
throws SemanticException {
Map<String, PartitionField> partitionFieldMap = getPartitionFields(table).stream()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.mr.TestHelper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class TestHiveIcebergCompaction {

private static TestHiveShell shell;
private TestTables testTables;
@Rule
public TemporaryFolder temp = new TemporaryFolder();

static final List<Record> CUSTOMER_RECORDS_1 = TestHelper.RecordsBuilder.newInstance(
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
.add(0L, "Alice", "Brown")
.add(1L, "Bob", "Green")
.build();

static final List<Record> CUSTOMER_RECORDS_2 = TestHelper.RecordsBuilder.newInstance(
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
.add(2L, "Bruce", "Brown")
.add(3L, "Trudy", "Green")
.add(4L, "Alex", "Pink")
.build();

static final List<Record> CUSTOMER_RECORDS_3 = TestHelper.RecordsBuilder.newInstance(
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
.add(5L, "Bruce", "Blue")
.add(6L, "Trudy", "Blue")
.build();

static final List<Record> CUSTOMER_RECORDS_4 = TestHelper.RecordsBuilder.newInstance(
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
.add(0L, "Alice", "Brown").build();

static final List<Record> CUSTOMER_RECORDS_5 = TestHelper.RecordsBuilder.newInstance(
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
.add(2L, "Bruce", "Brown")
.build();

@BeforeClass
public static void beforeClass() {
shell = HiveIcebergStorageHandlerTestUtils.shell();
}

@AfterClass
public static void afterClass() throws Exception {
shell.stop();
}

@Before
public void before() throws IOException {
testTables = HiveIcebergStorageHandlerTestUtils.testTables(shell, TestTables.TestTableType.HIVE_CATALOG, temp);
HiveIcebergStorageHandlerTestUtils.init(shell, testTables, temp, "tez");
HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
}

@After
public void after() throws Exception {
HiveIcebergStorageHandlerTestUtils.close(shell);
ExecMapper.setDone(false);
}

@Test
public void testCanCompactPartitioned() {
PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
.identity("last_name").build();

Table table = testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, FileFormat.ORC, CUSTOMER_RECORDS_1, 2);

shell.executeStatement(testTables.getInsertQuery(CUSTOMER_RECORDS_2,
TableIdentifier.of("default", "customers"), false));

shell.executeStatement("DELETE FROM customers WHERE customer_id=3");

List<Object[]> objects = shell.executeStatement("SELECT * FROM customers ORDER BY customer_id");
Assert.assertEquals(4, objects.size());
List<Record> expected = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
.add(0L, "Alice", "Brown")
.add(1L, "Bob", "Green")
.add(2L, "Bruce", "Brown")
.add(4L, "Alex", "Pink")
.build();
HiveIcebergTestUtils.validateData(expected,
HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, objects), 0);

HiveConf conf = new HiveConf();
conf.setIntVar(HiveConf.ConfVars.ICEBERG_COMPACTION_DELETE_RECORDS_THRESHOLD, 1);
HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler();
storageHandler.setConf(conf);
table.refresh();

try {
/*
* Partition: 'last_name=Brown'.
* 2 data files, 0 delete files.
* Existing data size of the partition: 955 bytes
* 1st file size in bytes: 479
* 2nd files size in bytes: 476
*/

// Does not need compaction because ratio of uncompacted/compacted file sizes = 0%, all files compacted.
conf.set(HiveConf.ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD.varname, "10bytes");
assertFalse(storageHandler.canCompact(conf, table, "last_name=Brown", CompactionType.MAJOR));

// Needs compaction because ratio of uncompacted/compacted file sizes = 50%, above allowed threshold of 10%.
conf.set(HiveConf.ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD.varname, "477bytes");
assertTrue(storageHandler.canCompact(conf, table, "last_name=Brown", CompactionType.MAJOR));

// Needs compaction because ratio of uncompacted/compacted file sizes = 100%, all files uncompacted.
conf.set(HiveConf.ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD.varname, "1Mb");
assertTrue(storageHandler.canCompact(conf, table, "last_name=Brown", CompactionType.MAJOR));

// Delete records count is below threshold and only 1 data file, cannot compact.
assertFalse(storageHandler.canCompact(conf, table, "last_name=Green", CompactionType.MAJOR));

// No delete files, only one data file, cannot compact.
assertFalse(storageHandler.canCompact(conf, table, "last_name=Pink", CompactionType.MAJOR));

// Needs compaction because ratio of deleted records (2) exceeds threshold (1)
shell.executeStatement(testTables.getInsertQuery(CUSTOMER_RECORDS_3,
TableIdentifier.of("default", "customers"), false));
shell.executeStatement("DELETE FROM customers WHERE customer_id=5");
shell.executeStatement("DELETE FROM customers WHERE customer_id=6");
table.refresh();
assertTrue(storageHandler.canCompact(conf, table, "last_name=Blue", CompactionType.MAJOR));
} catch (Exception e) {
fail("Exception is unexpected here");
}
}

@Test
public void testCanCompactUnpartitioned() {
PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA).build();

Table table = testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, FileFormat.ORC, null, 2);

HiveConf conf = new HiveConf();
conf.setIntVar(HiveConf.ConfVars.ICEBERG_COMPACTION_DELETE_RECORDS_THRESHOLD, 1);
HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler();
storageHandler.setConf(conf);

try {
// Zero data/delete files - cannot compact
assertFalse(storageHandler.canCompact(conf, table, null, CompactionType.MAJOR));

shell.executeStatement(testTables.getInsertQuery(CUSTOMER_RECORDS_4,
TableIdentifier.of("default", "customers"), false));
shell.executeStatement(testTables.getInsertQuery(CUSTOMER_RECORDS_5,
TableIdentifier.of("default", "customers"), false));
table.refresh();

/*
* 2 data files, 0 delete files.
* Existing data size of the partition: 955 bytes
* 1st file size in bytes: 479
* 2nd files size in bytes: 476
*/

// Does not need compaction because ratio of uncompacted/compacted file sizes = 0%, all files compacted.
conf.set(HiveConf.ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD.varname, "10bytes");
assertFalse(storageHandler.canCompact(conf, table, null, CompactionType.MAJOR));

// Needs compaction because ratio of uncompacted/compacted file sizes = 50%, above allowed threshold of 10%.
conf.set(HiveConf.ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD.varname, "477bytes");
assertTrue(storageHandler.canCompact(conf, table, null, CompactionType.MAJOR));

// Needs compaction because ratio of uncompacted/compacted file sizes = 100%, all files uncompacted.
conf.set(HiveConf.ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD.varname, "1Mb");
assertTrue(storageHandler.canCompact(conf, table, null, CompactionType.MAJOR));

shell.executeStatement(testTables.getInsertQuery(CUSTOMER_RECORDS_1,
TableIdentifier.of("default", "customers"), false));
shell.executeStatement(testTables.getInsertQuery(CUSTOMER_RECORDS_2,
TableIdentifier.of("default", "customers"), false));
shell.executeStatement("DELETE FROM customers where customer_id=0");
shell.executeStatement("DELETE FROM customers where customer_id=1");
table.refresh();

// Needs compaction because ratio of deleted records (3) exceeds threshold (1)
assertTrue(storageHandler.canCompact(conf, table, null, CompactionType.MAJOR));
} catch (Exception e) {
fail("Exception is unexpected here");
}
}
}
Loading
Loading