Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28793: Expire query history snapshots #5666

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -5781,6 +5781,9 @@ public static enum ConfVars {
HIVE_QUERY_HISTORY_REPOSITORY_CLASS("hive.query.history.repository.class",
"org.apache.hadoop.hive.ql.queryhistory.repository.IcebergRepository",
"The class implementing QueryHistoryRepository to be used for persisting Record instances"),
HIVE_QUERY_HISTORY_ICEBERG_SNAPSHOT_EXPIRY_INVERVAL_SECONDS(
"hive.query.history.iceberg.snapshot.expiry.interval.seconds", 3600,
"The iceberg repository of query history service expires snapshots periodically according to this config."),
HIVE_SECURITY_AUTHORIZATION_SCHEDULED_QUERIES_SUPPORTED("hive.security.authorization.scheduled.queries.supported",
false,
"Enable this if the configured authorizer is able to handle scheduled query related calls."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,60 +19,86 @@

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ServiceContext;
import org.apache.hadoop.hive.ql.queryhistory.QueryHistoryService;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.queryhistory.repository.QueryHistoryRepository;
import org.apache.hadoop.hive.ql.queryhistory.schema.DummyRecord;
import org.apache.hadoop.hive.ql.queryhistory.schema.IcebergRecord;
import org.apache.hadoop.hive.ql.queryhistory.schema.Record;
import org.apache.hadoop.hive.ql.queryhistory.schema.Schema;
import org.apache.hadoop.hive.ql.queryhistory.schema.QueryHistorySchemaTestUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hive.common.util.ReflectionUtil;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.mr.mapred.Container;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

public class TestIcebergRepository {
private static final Logger LOG = LoggerFactory.getLogger(TestIcebergRepository.class);
private static final ServiceContext serviceContext = new ServiceContext(() -> DummyRecord.SERVER_HOST,
() -> DummyRecord.SERVER_PORT);
private final Queue<Record> queryHistoryQueue = new LinkedBlockingQueue<>();

private HiveConf conf;
private IcebergRepository repository;

/*
* This unit test asserts that the created record is persisted as expected and the values made their way to the
* iceberg table.
*/
@Test
public void testPersistRecord() throws Exception {
HiveConf conf = new HiveConf();
conf.set("iceberg.engine.hive.lock-enabled", "false");
Record historyRecord = new DummyRecord();

conf.setBoolVar(HiveConf.ConfVars.HIVE_CLI_TEZ_INITIALIZE_SESSION, false);
conf.setIntVar(HiveConf.ConfVars.HIVE_QUERY_HISTORY_BATCH_SIZE, 0);
createIcebergRepository(conf);

Record record = new DummyRecord();
queryHistoryQueue.add(historyRecord);
repository.flush(queryHistoryQueue);

QueryHistoryService service = QueryHistoryService.newInstance(conf, serviceContext);
service.start();
IcebergRepository repository = (IcebergRepository) service.getRepository();
checkRecords(conf, repository, historyRecord);
}

queryHistoryQueue.add(record);
repository.flush(queryHistoryQueue);
/**
* Useful defaults for iceberg repository testing. Can be overridden in test cases if needed.
* @return HiveConf to be used
*/
private HiveConf newHiveConf() {
HiveConf testConf = new HiveConf();
testConf.set("iceberg.engine.hive.lock-enabled", "false");
testConf.setBoolVar(HiveConf.ConfVars.HIVE_CLI_TEZ_INITIALIZE_SESSION, false);
testConf.setIntVar(HiveConf.ConfVars.HIVE_QUERY_HISTORY_BATCH_SIZE, 0); // sync persist on flush
testConf.setVar(HiveConf.ConfVars.HIVE_QUERY_HISTORY_REPOSITORY_CLASS, IcebergRepositoryForTest.class.getName());
return testConf;
}

checkRecords(conf, repository, record);
private void createIcebergRepository(HiveConf conf) {
try {
repository = (IcebergRepository) ReflectionUtil.newInstance(
conf.getClassByName(conf.get(HiveConf.ConfVars.HIVE_QUERY_HISTORY_REPOSITORY_CLASS.varname)), conf);
repository.init(conf, new Schema());
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}

private void checkRecords(HiveConf conf, IcebergRepository repository, Record record)
private void checkRecords(HiveConf conf, IcebergRepository repository, Record historyRecord)
throws Exception {
JobConf jobConf = new JobConf(conf);
// force table to be reloaded from Catalogs to see latest Snapshot
Expand All @@ -82,14 +108,14 @@ private void checkRecords(HiveConf conf, IcebergRepository repository, Record re
Record deserialized = new IcebergRecord((GenericRecord) container.get());

Assert.assertTrue("Original and deserialized records should contain equal values",
QueryHistorySchemaTestUtils.queryHistoryRecordsAreEqual(record, deserialized));
QueryHistorySchemaTestUtils.queryHistoryRecordsAreEqual(historyRecord, deserialized));
}

private Container readRecords(IcebergRepository repository, JobConf jobConf) throws Exception {
InputFormat<?, ?> inputFormat = repository.storageHandler.getInputFormatClass().newInstance();
String tableLocation = repository.tableDesc.getProperties().get("location").toString();
String tableLocation = getTableLocation(repository);
File[] dataFiles =
new File(tableLocation.replaceAll("^[a-zA-Z]+:", "") +
new File(tableLocation +
"/data/cluster_id=" + DummyRecord.CLUSTER_ID).listFiles(
file -> file.isFile() && file.getName().toLowerCase().endsWith(".orc"));
FileInputFormat.setInputPaths(jobConf, new Path(dataFiles[0].toURI()));
Expand All @@ -107,4 +133,78 @@ private Container readRecords(IcebergRepository repository, JobConf jobConf) thr
}
return container;
}

private String getTableLocation(IcebergRepository repository) {
return repository.tableDesc.getProperties().get("location").toString().replaceAll("^[a-zA-Z]+:", "");
}

@Test
public void testExpireSnapshots() throws Exception {
conf.setIntVar(HiveConf.ConfVars.HIVE_QUERY_HISTORY_ICEBERG_SNAPSHOT_EXPIRY_INVERVAL_SECONDS, 1);
conf.setVar(HiveConf.ConfVars.HIVE_QUERY_HISTORY_REPOSITORY_CLASS,
IcebergRepositoryWithShortSnapshotAge.class.getName());
createIcebergRepository(conf);

String metadataDirectory = getTableLocation(repository) + "/metadata";
assertSnapshotFiles(metadataDirectory, 0);
Record historyRecord = new DummyRecord();

// flush a record, 1 snapshot is visible
queryHistoryQueue.add(historyRecord);
repository.flush(queryHistoryQueue);
assertSnapshotFiles(metadataDirectory, 1);

// flush another record, 2 snapshots are visible
queryHistoryQueue.add(historyRecord);
repository.flush(queryHistoryQueue);
assertSnapshotFiles(metadataDirectory, 2);

// wait for the expiry to take effect, after which only one snapshot will be visible again.
Thread.sleep(3000);
assertSnapshotFiles(metadataDirectory, 1);
}

private void assertSnapshotFiles(String metadataDirectory, int numberForSnapshotFiles) {
File[] matchingFiles = new File(metadataDirectory).listFiles((dir, name) -> name.startsWith("snap-"));
List<File> files = Optional.ofNullable(matchingFiles).map(Arrays::asList).orElse(Collections.emptyList());
LOG.debug("Snapshot files found: {}", files);
Assert.assertEquals(numberForSnapshotFiles, files.size());
}

/**
* Base testing iceberg repository class that takes care of table purge between unit test cases.
*/
public static class IcebergRepositoryForTest extends IcebergRepository {
@Override
protected Table createTableObject() {
Table table = super.createTableObject();
table.setProperty("external.table.purge", "TRUE"); // cleanup between tests to prevent data/metadata collision
return table;
}
}

/**
* Test class extending IcebergRepositoryForTest. This is because the snapshot age property is not exposed
* as an easy hive configuration, instead, it's supposed to be changed by ALTER TABLE commands. We need a low value
* here to test snapshot expiry.
*/
public static class IcebergRepositoryWithShortSnapshotAge extends IcebergRepositoryForTest {
@Override
int getSnapshotMaxAge() {
return 500;
}
}

@Before
public void before() {
conf = newHiveConf();
SessionState.start(conf);
}

@After
public void after() throws Exception {
if (repository != null) {
Hive.get().dropTable(QueryHistoryRepository.QUERY_HISTORY_DB_TABLE_NAME, true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public abstract class AbstractRepository implements QueryHistoryRepository {
HiveConf conf;
protected Schema schema;
private Warehouse warehouse;
protected Table table;

public void init(HiveConf conf, Schema schema) {
this.conf = conf;
Expand All @@ -59,8 +60,8 @@ public void init(HiveConf conf, Schema schema) {

try (Hive hive = Hive.get(conf)) {
Database database = initDatabase(hive);
Table table = initTable(hive, database);
postInitTable(table);
initTable(hive, database);
postInitTable();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we trigger postInitTable within the initTable?

} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -97,21 +98,19 @@ private String getDatabaseLocation(String databaseName) throws Exception {
return warehouse.getDefaultExternalDatabasePath(databaseName).toUri().toString();
}

protected Table initTable(Hive hive, Database db) {
Table table;
protected void initTable(Hive hive, Database db) {
try {
table = hive.getTable(QUERY_HISTORY_DB_NAME, QUERY_HISTORY_TABLE_NAME, null, false);
if (table == null) {
LOG.info("Query history table ({}) isn't created yet", QUERY_HISTORY_TABLE_NAME);
table = createTable(hive, db);
}
return table;
} catch (HiveException e) {
throw new RuntimeException(e);
}
}

protected abstract void postInitTable(Table table) throws Exception;
protected abstract void postInitTable() throws Exception;

/**
* Supposed to create the query history table in metastore. It's only called from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.queryhistory.schema.Record;
import org.apache.hadoop.hive.ql.queryhistory.schema.Schema;
Expand All @@ -57,6 +58,10 @@
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.EXPIRE_SNAPSHOT;

public class IcebergRepository extends AbstractRepository implements QueryHistoryRepository {
private static final Logger LOG = LoggerFactory.getLogger(IcebergRepository.class);
Expand All @@ -70,12 +75,17 @@ public class IcebergRepository extends AbstractRepository implements QueryHistor
@VisibleForTesting
TableDesc tableDesc;
private ExecutorService icebergExecutor;
private final ScheduledExecutorService snapshotExpiryExecutor = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("IcebergRepository periodic snapshot expiry").setDaemon(true).build());

@Override
public void init(HiveConf conf, Schema schema) {
super.init(conf, schema);
icebergExecutor = Executors.newFixedThreadPool(2,
(new ThreadFactoryBuilder()).setDaemon(true).setNameFormat(ICEBERG_WORKER_THREAD_NAME_FORMAT).build());
int expiryInterval = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_QUERY_HISTORY_ICEBERG_SNAPSHOT_EXPIRY_INVERVAL_SECONDS);
snapshotExpiryExecutor.scheduleAtFixedRate(this::expireSnapshots, 0, expiryInterval, TimeUnit.SECONDS);
}

@Override
Expand All @@ -89,6 +99,7 @@ protected Table createTable(Hive hive, Database db) throws HiveException {
ICEBERG_STORAGE_HANDLER);
table.setProperty("table_type", "ICEBERG");
table.setProperty("write.format.default", "orc");
table.setProperty("history.expire.max-snapshot-age-ms", Integer.toString(getSnapshotMaxAge()));
table.setProperty(hive_metastoreConstants.META_TABLE_NAME, QUERY_HISTORY_DB_TABLE_NAME);

table.setFields(schema.getFields());
Expand All @@ -101,8 +112,14 @@ protected Table createTable(Hive hive, Database db) throws HiveException {
return table;
}

@VisibleForTesting
int getSnapshotMaxAge() {
// expire/delete snapshots older than 1 day
return 24 * 60 * 60 * 1000;
}

@Override
protected void postInitTable(Table table) throws Exception {
protected void postInitTable() throws Exception {
this.tableDesc = Utilities.getTableDesc(table);

Map<String, String> map = new HashMap<>();
Expand Down Expand Up @@ -176,4 +193,13 @@ private void prepareConfForWrite() {
SessionStateUtil.addCommitInfo(SessionState.getSessionConf(), tableDesc.getTableName(), jobId, 1,
Maps.fromProperties(tableDesc.getProperties()));
}

private void expireSnapshots() {
Copy link
Member

@deniskuzZ deniskuzZ Mar 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have an AcidHouseKeeperService, I think we should create a similar IcebergHouseKeeperService and schedule periodic snapshot expiry on Iceberg tables.

You can define the expiry policy for the query-history table and IcebergHouseKeeperService / IcebergTableExpiryService would take care of it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, theoretically, handling them in a centralized place in metastore instead of multiple HS2s looks better from system design point of view
however, it needs a new service to be implemented, let me check

Copy link
Member

@deniskuzZ deniskuzZ Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for a "NEW" service, add it to the list of TASK_THREADS_REMOTE_ONLY
but yes, that service should iterate over all iceberg tables with expiry configs and periodically trigger the snapshot expiry action

take a look at AcidTxnCleanerService

  public long runFrequency(TimeUnit unit) {
    return INTERVAL;
  }

  public void run() {
     doTheMagic()
  }

Copy link
Contributor Author

@abstractdog abstractdog Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took a look, my concern about this whole service is exactly the doTheMagic part: I remember we faced lots of problems with e.g. the PartitionManagementTask because it couldn't have been run in a lightweight fashion, and the common thing is that we need to iterate on all tables of all DBs to achieve what we want here: the best case scenario is that we implement a filter on ICEBERG tables (like: listTableNamesByFilter), kinda pushing down the filter to backend db
but even if we do that, we still need lots of code to make it as optimal as possible, just take a look at how complicated the partition management task filtering logic is to achieve the same).
So looking at the implementations so far, the "getting all iceberg tables" is the very part I'm worried about, and makes me wonder I really want to go towards that direction just the sake of this query history snapshot expiry, which is otherwise a very simple code

Focusing on the motivation, which is to handle the query history table—fully loaded and maintained by Hive—we owe it to users to optimize it as much as possible. However, the question is whether we should take on the additional burden in this PR to handle all Iceberg tables visible in the metastore, or keep the scope focused for now.

Or maybe a similar db + table name filter (that's used in the part mgmt task), but we default it to: "sys" + "query_history" in MetastoreConf and give the user the power to set it even to * + * ?

Copy link
Member

@deniskuzZ deniskuzZ Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. you don't need to go down to partition level;
  2. you can get list of iceberg tables from HMS, there is an API for that;
  3. yes, service should be optimized.
  • check the last housekeeping event,
  • check if there were any changes to the table since last attempt (check if metadata file hasn't changed),
  • support table name filtering;
  1. adding it just for query history, doesn't bring much value and is not generic;
  2. downstream you can configure all policies in DLM so you don't need this at all;

LOG.debug("Attempting to expire snapshots for table: {}", table.getFullTableName());
try {
storageHandler.executeOperation(table, new AlterTableExecuteSpec<>(EXPIRE_SNAPSHOT, null));
} catch (Exception e) {
LOG.warn("Failed to expire snapshots", e);
}
}
}
Loading