diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 107a848e78cd..1d4962637a76 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -5781,6 +5781,9 @@ public static enum ConfVars { HIVE_QUERY_HISTORY_REPOSITORY_CLASS("hive.query.history.repository.class", "org.apache.hadoop.hive.ql.queryhistory.repository.IcebergRepository", "The class implementing QueryHistoryRepository to be used for persisting Record instances"), + HIVE_QUERY_HISTORY_ICEBERG_SNAPSHOT_EXPIRY_INVERVAL_SECONDS( + "hive.query.history.iceberg.snapshot.expiry.interval.seconds", 3600, + "The iceberg repository of query history service expires snapshots periodically according to this config."), HIVE_SECURITY_AUTHORIZATION_SCHEDULED_QUERIES_SUPPORTED("hive.security.authorization.scheduled.queries.supported", false, "Enable this if the configured authorizer is able to handle scheduled query related calls."), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/queryhistory/repository/TestIcebergRepository.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/queryhistory/repository/TestIcebergRepository.java index f9ac04502dbc..b7c128735f8b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/queryhistory/repository/TestIcebergRepository.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/queryhistory/repository/TestIcebergRepository.java @@ -19,60 +19,86 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.ServiceContext; -import org.apache.hadoop.hive.ql.queryhistory.QueryHistoryService; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.queryhistory.repository.QueryHistoryRepository; import org.apache.hadoop.hive.ql.queryhistory.schema.DummyRecord; import org.apache.hadoop.hive.ql.queryhistory.schema.IcebergRecord; import org.apache.hadoop.hive.ql.queryhistory.schema.Record; +import org.apache.hadoop.hive.ql.queryhistory.schema.Schema; import org.apache.hadoop.hive.ql.queryhistory.schema.QueryHistorySchemaTestUtils; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hive.common.util.ReflectionUtil; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.mr.mapred.Container; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; public class TestIcebergRepository { private static final Logger LOG = LoggerFactory.getLogger(TestIcebergRepository.class); - private static final ServiceContext serviceContext = new ServiceContext(() -> DummyRecord.SERVER_HOST, - () -> DummyRecord.SERVER_PORT); private final Queue queryHistoryQueue = new LinkedBlockingQueue<>(); + private HiveConf conf; + private IcebergRepository repository; + /* * This unit test asserts that the created record is persisted as expected and the values made their way to the * iceberg table. */ @Test public void testPersistRecord() throws Exception { - HiveConf conf = new HiveConf(); - conf.set("iceberg.engine.hive.lock-enabled", "false"); + Record historyRecord = new DummyRecord(); - conf.setBoolVar(HiveConf.ConfVars.HIVE_CLI_TEZ_INITIALIZE_SESSION, false); - conf.setIntVar(HiveConf.ConfVars.HIVE_QUERY_HISTORY_BATCH_SIZE, 0); + createIcebergRepository(conf); - Record record = new DummyRecord(); + queryHistoryQueue.add(historyRecord); + repository.flush(queryHistoryQueue); - QueryHistoryService service = QueryHistoryService.newInstance(conf, serviceContext); - service.start(); - IcebergRepository repository = (IcebergRepository) service.getRepository(); + checkRecords(conf, repository, historyRecord); + } - queryHistoryQueue.add(record); - repository.flush(queryHistoryQueue); + /** + * Useful defaults for iceberg repository testing. Can be overridden in test cases if needed. + * @return HiveConf to be used + */ + private HiveConf newHiveConf() { + HiveConf testConf = new HiveConf(); + testConf.set("iceberg.engine.hive.lock-enabled", "false"); + testConf.setBoolVar(HiveConf.ConfVars.HIVE_CLI_TEZ_INITIALIZE_SESSION, false); + testConf.setIntVar(HiveConf.ConfVars.HIVE_QUERY_HISTORY_BATCH_SIZE, 0); // sync persist on flush + testConf.setVar(HiveConf.ConfVars.HIVE_QUERY_HISTORY_REPOSITORY_CLASS, IcebergRepositoryForTest.class.getName()); + return testConf; + } - checkRecords(conf, repository, record); + private void createIcebergRepository(HiveConf conf) { + try { + repository = (IcebergRepository) ReflectionUtil.newInstance( + conf.getClassByName(conf.get(HiveConf.ConfVars.HIVE_QUERY_HISTORY_REPOSITORY_CLASS.varname)), conf); + repository.init(conf, new Schema()); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } } - private void checkRecords(HiveConf conf, IcebergRepository repository, Record record) + private void checkRecords(HiveConf conf, IcebergRepository repository, Record historyRecord) throws Exception { JobConf jobConf = new JobConf(conf); // force table to be reloaded from Catalogs to see latest Snapshot @@ -82,14 +108,14 @@ private void checkRecords(HiveConf conf, IcebergRepository repository, Record re Record deserialized = new IcebergRecord((GenericRecord) container.get()); Assert.assertTrue("Original and deserialized records should contain equal values", - QueryHistorySchemaTestUtils.queryHistoryRecordsAreEqual(record, deserialized)); + QueryHistorySchemaTestUtils.queryHistoryRecordsAreEqual(historyRecord, deserialized)); } private Container readRecords(IcebergRepository repository, JobConf jobConf) throws Exception { InputFormat inputFormat = repository.storageHandler.getInputFormatClass().newInstance(); - String tableLocation = repository.tableDesc.getProperties().get("location").toString(); + String tableLocation = getTableLocation(repository); File[] dataFiles = - new File(tableLocation.replaceAll("^[a-zA-Z]+:", "") + + new File(tableLocation + "/data/cluster_id=" + DummyRecord.CLUSTER_ID).listFiles( file -> file.isFile() && file.getName().toLowerCase().endsWith(".orc")); FileInputFormat.setInputPaths(jobConf, new Path(dataFiles[0].toURI())); @@ -107,4 +133,78 @@ private Container readRecords(IcebergRepository repository, JobConf jobConf) thr } return container; } + + private String getTableLocation(IcebergRepository repository) { + return repository.tableDesc.getProperties().get("location").toString().replaceAll("^[a-zA-Z]+:", ""); + } + + @Test + public void testExpireSnapshots() throws Exception { + conf.setIntVar(HiveConf.ConfVars.HIVE_QUERY_HISTORY_ICEBERG_SNAPSHOT_EXPIRY_INVERVAL_SECONDS, 1); + conf.setVar(HiveConf.ConfVars.HIVE_QUERY_HISTORY_REPOSITORY_CLASS, + IcebergRepositoryWithShortSnapshotAge.class.getName()); + createIcebergRepository(conf); + + String metadataDirectory = getTableLocation(repository) + "/metadata"; + assertSnapshotFiles(metadataDirectory, 0); + Record historyRecord = new DummyRecord(); + + // flush a record, 1 snapshot is visible + queryHistoryQueue.add(historyRecord); + repository.flush(queryHistoryQueue); + assertSnapshotFiles(metadataDirectory, 1); + + // flush another record, 2 snapshots are visible + queryHistoryQueue.add(historyRecord); + repository.flush(queryHistoryQueue); + assertSnapshotFiles(metadataDirectory, 2); + + // wait for the expiry to take effect, after which only one snapshot will be visible again. + Thread.sleep(3000); + assertSnapshotFiles(metadataDirectory, 1); + } + + private void assertSnapshotFiles(String metadataDirectory, int numberForSnapshotFiles) { + File[] matchingFiles = new File(metadataDirectory).listFiles((dir, name) -> name.startsWith("snap-")); + List files = Optional.ofNullable(matchingFiles).map(Arrays::asList).orElse(Collections.emptyList()); + LOG.debug("Snapshot files found: {}", files); + Assert.assertEquals(numberForSnapshotFiles, files.size()); + } + + /** + * Base testing iceberg repository class that takes care of table purge between unit test cases. + */ + public static class IcebergRepositoryForTest extends IcebergRepository { + @Override + protected Table createTableObject() { + Table table = super.createTableObject(); + table.setProperty("external.table.purge", "TRUE"); // cleanup between tests to prevent data/metadata collision + return table; + } + } + + /** + * Test class extending IcebergRepositoryForTest. This is because the snapshot age property is not exposed + * as an easy hive configuration, instead, it's supposed to be changed by ALTER TABLE commands. We need a low value + * here to test snapshot expiry. + */ + public static class IcebergRepositoryWithShortSnapshotAge extends IcebergRepositoryForTest { + @Override + int getSnapshotMaxAge() { + return 500; + } + } + + @Before + public void before() { + conf = newHiveConf(); + SessionState.start(conf); + } + + @After + public void after() throws Exception { + if (repository != null) { + Hive.get().dropTable(QueryHistoryRepository.QUERY_HISTORY_DB_TABLE_NAME, true); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/queryhistory/repository/AbstractRepository.java b/ql/src/java/org/apache/hadoop/hive/ql/queryhistory/repository/AbstractRepository.java index cd4a22ea3e91..c8c198ef3c99 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/queryhistory/repository/AbstractRepository.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/queryhistory/repository/AbstractRepository.java @@ -46,6 +46,7 @@ public abstract class AbstractRepository implements QueryHistoryRepository { HiveConf conf; protected Schema schema; private Warehouse warehouse; + protected Table table; public void init(HiveConf conf, Schema schema) { this.conf = conf; @@ -59,8 +60,8 @@ public void init(HiveConf conf, Schema schema) { try (Hive hive = Hive.get(conf)) { Database database = initDatabase(hive); - Table table = initTable(hive, database); - postInitTable(table); + initTable(hive, database); + postInitTable(); } catch (Exception e) { throw new RuntimeException(e); } @@ -97,21 +98,19 @@ private String getDatabaseLocation(String databaseName) throws Exception { return warehouse.getDefaultExternalDatabasePath(databaseName).toUri().toString(); } - protected Table initTable(Hive hive, Database db) { - Table table; + protected void initTable(Hive hive, Database db) { try { table = hive.getTable(QUERY_HISTORY_DB_NAME, QUERY_HISTORY_TABLE_NAME, null, false); if (table == null) { LOG.info("Query history table ({}) isn't created yet", QUERY_HISTORY_TABLE_NAME); table = createTable(hive, db); } - return table; } catch (HiveException e) { throw new RuntimeException(e); } } - protected abstract void postInitTable(Table table) throws Exception; + protected abstract void postInitTable() throws Exception; /** * Supposed to create the query history table in metastore. It's only called from diff --git a/ql/src/java/org/apache/hadoop/hive/ql/queryhistory/repository/IcebergRepository.java b/ql/src/java/org/apache/hadoop/hive/ql/queryhistory/repository/IcebergRepository.java index 38a57768f6ae..9e5a80168ced 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/queryhistory/repository/IcebergRepository.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/queryhistory/repository/IcebergRepository.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.queryhistory.schema.Record; import org.apache.hadoop.hive.ql.queryhistory.schema.Schema; @@ -57,6 +58,10 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.EXPIRE_SNAPSHOT; public class IcebergRepository extends AbstractRepository implements QueryHistoryRepository { private static final Logger LOG = LoggerFactory.getLogger(IcebergRepository.class); @@ -70,12 +75,17 @@ public class IcebergRepository extends AbstractRepository implements QueryHistor @VisibleForTesting TableDesc tableDesc; private ExecutorService icebergExecutor; + private final ScheduledExecutorService snapshotExpiryExecutor = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setNameFormat("IcebergRepository periodic snapshot expiry").setDaemon(true).build()); @Override public void init(HiveConf conf, Schema schema) { super.init(conf, schema); icebergExecutor = Executors.newFixedThreadPool(2, (new ThreadFactoryBuilder()).setDaemon(true).setNameFormat(ICEBERG_WORKER_THREAD_NAME_FORMAT).build()); + int expiryInterval = HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVE_QUERY_HISTORY_ICEBERG_SNAPSHOT_EXPIRY_INVERVAL_SECONDS); + snapshotExpiryExecutor.scheduleAtFixedRate(this::expireSnapshots, 0, expiryInterval, TimeUnit.SECONDS); } @Override @@ -89,6 +99,7 @@ protected Table createTable(Hive hive, Database db) throws HiveException { ICEBERG_STORAGE_HANDLER); table.setProperty("table_type", "ICEBERG"); table.setProperty("write.format.default", "orc"); + table.setProperty("history.expire.max-snapshot-age-ms", Integer.toString(getSnapshotMaxAge())); table.setProperty(hive_metastoreConstants.META_TABLE_NAME, QUERY_HISTORY_DB_TABLE_NAME); table.setFields(schema.getFields()); @@ -101,8 +112,14 @@ protected Table createTable(Hive hive, Database db) throws HiveException { return table; } + @VisibleForTesting + int getSnapshotMaxAge() { + // expire/delete snapshots older than 1 day + return 24 * 60 * 60 * 1000; + } + @Override - protected void postInitTable(Table table) throws Exception { + protected void postInitTable() throws Exception { this.tableDesc = Utilities.getTableDesc(table); Map map = new HashMap<>(); @@ -176,4 +193,13 @@ private void prepareConfForWrite() { SessionStateUtil.addCommitInfo(SessionState.getSessionConf(), tableDesc.getTableName(), jobId, 1, Maps.fromProperties(tableDesc.getProperties())); } + + private void expireSnapshots() { + LOG.debug("Attempting to expire snapshots for table: {}", table.getFullTableName()); + try { + storageHandler.executeOperation(table, new AlterTableExecuteSpec<>(EXPIRE_SNAPSHOT, null)); + } catch (Exception e) { + LOG.warn("Failed to expire snapshots", e); + } + } }