diff --git a/coral-integration-test/build.gradle b/coral-integration-test/build.gradle new file mode 100644 index 000000000..8ecd7eefe --- /dev/null +++ b/coral-integration-test/build.gradle @@ -0,0 +1,115 @@ +// Copyright 2019-2020 LinkedIn Corporation. All rights reserved. +// Licensed under the BSD-2 Clause license. +// See LICENSE in the project root for license information. + +apply plugin: 'java' + +sourceCompatibility = 1.8 +targetCompatibility = 1.8 + +def jacksonVersion = '2.15.3' // Use a consistent version + +dependencies { + + implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}" + implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" + implementation "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}" + implementation "com.fasterxml.jackson.module:jackson-module-scala_2.12:${jacksonVersion}" + + // Coral dependencies + testCompile project(':coral-hive') + testCompile project(':coral-spark') + testCompile project(':coral-schema') + testCompile project(':coral-trino') + + // Spark3 dependencies + testCompile(deps.'spark3'.'hive') { + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-annotations' + } + + // Hive Metastore 2.0 dependencies + testCompile(deps.'hive2'.'hive-metastore') { + exclude group: 'org.apache.calcite', module: 'calcite-core' + exclude group: 'org.apache.calcite', module: 'calcite-avatica' + exclude group: 'org.apache.avro', module: 'avro-tools' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-annotations' + } + testCompile(deps.'hive2'.'hive-exec-core') { + exclude group: 'org.apache.calcite', module: 'calcite-core' + exclude group: 'org.apache.calcite', module: 'calcite-avatica' + exclude group: 'org.apache.avro', module: 'avro-tools' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-annotations' + } + + // Iceberg dependencies + testCompile(deps.'iceberg'.'iceberg-core') { + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-annotations' + } + testCompile(deps.'iceberg'.'iceberg-spark') { + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-annotations' + } + testCompile(deps.'iceberg'.'iceberg-hive-metastore') { + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-annotations' + } + // Add Iceberg test utilities for TestHiveMetastore + testCompile(deps.'iceberg'.'iceberg-hive-metastore' + ':tests') { + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core' + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-annotations' + } + + // Hadoop dependencies + testCompile deps.'hadoop'.'hadoop-common' + testCompile deps.'hadoop'.'hadoop-mapreduce-client-core' + + // Additional test dependencies + testCompile 'org.apache.derby:derby:10.14.2.0' + testCompile 'org.slf4j:slf4j-api:1.7.30' + testCompile 'org.slf4j:slf4j-log4j12:1.7.30' + + // Force consistent Jackson version (Spark 3.1.1 uses 2.10.0) + testCompile 'com.fasterxml.jackson.core:jackson-databind:2.10.0' + testCompile 'com.fasterxml.jackson.core:jackson-core:2.10.0' + testCompile 'com.fasterxml.jackson.core:jackson-annotations:2.10.0' + testCompile 'com.fasterxml.jackson.module:jackson-module-scala_2.12:2.10.0' + testCompile 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.10.0' +} + +// Needs to enforce using compatible versions +configurations.all { + resolutionStrategy { + force 'org.apache.avro:avro:1.10.2' + force 'com.google.guava:guava:27.0-jre' + // Force Jackson 2.10.0 - Spark 3.1.1 default version + force 'com.fasterxml.jackson.core:jackson-databind:2.10.0' + force 'com.fasterxml.jackson.core:jackson-core:2.10.0' + force 'com.fasterxml.jackson.core:jackson-annotations:2.10.0' + force 'com.fasterxml.jackson.module:jackson-module-scala_2.12:2.10.0' + force 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.10.0' + } +} + +test { + systemProperty 'derby.stream.error.file', 'build/derby.log' + systemProperty 'java.io.tmpdir', 'build/tmp' + + // Run tests sequentially to avoid resource contention + maxParallelForks = 1 + + testLogging { + events "passed", "skipped", "failed" + exceptionFormat "full" + } +} diff --git a/coral-integration-test/src/test/java/com/linkedin/coral/integration/CoralIntegrationTestBase.java b/coral-integration-test/src/test/java/com/linkedin/coral/integration/CoralIntegrationTestBase.java new file mode 100644 index 000000000..d6c7470e0 --- /dev/null +++ b/coral-integration-test/src/test/java/com/linkedin/coral/integration/CoralIntegrationTestBase.java @@ -0,0 +1,290 @@ +/** + * Copyright 2019-2025 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.coral.integration; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.avro.Schema; +import org.apache.calcite.rel.RelNode; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.iceberg.hive.TestHiveMetastore; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + +import com.linkedin.coral.common.HiveMetastoreClient; +import com.linkedin.coral.hive.hive2rel.HiveToRelConverter; +import com.linkedin.coral.schema.avro.ViewToAvroSchemaConverter; +import com.linkedin.coral.spark.CoralSpark; +import com.linkedin.coral.trino.trino2rel.parsetree.TrinoParserDriver; + +import coral.shading.io.trino.sql.tree.Statement; + + +/** +/** + * Abstract base class for integration tests using Coral with Spark 3, Iceberg, and Hive Metastore. + * Utilizes Iceberg's TestHiveMetastore for the Iceberg catalog and the Derby-based Hive Metastore from + * HiveMetastoreTestBase for the Spark catalog to simulate real-world table configurations. + * Also provides utilities to check if a query conforms to Coral translations. + */ +public class CoralIntegrationTestBase extends HiveMetastoreTestBase { + + protected SparkSession spark; + protected static TestHiveMetastore icebergTestHms; + protected HiveConf icebergHiveConf; + + /** + * Don't create a separate HiveMetaStoreClient - Spark will manage its own + */ + @Override + protected boolean shouldCreateMetastoreClient() { + return false; + } + + /** + * Create default database in Iceberg's TestHiveMetastore + */ + private void createIcebergDefaultDatabase() throws Exception { + org.apache.hadoop.hive.metastore.HiveMetaStoreClient client = + new org.apache.hadoop.hive.metastore.HiveMetaStoreClient(icebergHiveConf); + try { + // Check if default database already exists + try { + client.getDatabase("default"); + System.out.println("Default database already exists in Iceberg HMS"); + } catch (Exception e) { + // Default database doesn't exist, create it + String warehousePath = icebergHiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE); + org.apache.hadoop.hive.metastore.api.Database defaultDb = new org.apache.hadoop.hive.metastore.api.Database( + "default", "Default database for Iceberg", warehousePath, null); + client.createDatabase(defaultDb); + System.out.println("Created default database in Iceberg HMS at: " + warehousePath); + } + } finally { + client.close(); + } + } + + @BeforeClass + public void setupSpark() throws Exception { + // Setup Derby-based HMS for Hive catalog (from parent class) + super.setupHiveMetastore(); + + // Setup Iceberg's TestHiveMetastore for Iceberg catalog + icebergTestHms = new TestHiveMetastore(); + icebergTestHms.start(); + icebergHiveConf = icebergTestHms.hiveConf(); + + // Create default database in Iceberg's HMS if it doesn't exist + createIcebergDefaultDatabase(); + + // Create SparkSession with TWO separate HMS instances + // - Iceberg catalog uses TestHiveMetastore (Iceberg's test utility) + // - Hive catalog uses Derby-based HMS (from HiveMetastoreTestBase) + spark = SparkSession.builder().appName("CoralIntegrationTest").master("local[2]") + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + + // Iceberg catalog using TestHiveMetastore + .config("spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.iceberg_catalog.type", "hive") + .config("spark.sql.catalog.iceberg_catalog.uri", icebergHiveConf.getVar(HiveConf.ConfVars.METASTOREURIS)) + .config("spark.sql.catalog.iceberg_catalog.warehouse", + icebergHiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)) + + // Default Hive catalog using Derby-based HMS + .config("spark.sql.catalogImplementation", "hive").config("hive.metastore.uris", getHiveMetastoreUri()) + .config("spark.sql.warehouse.dir", getWarehouseDir()) + + // Spark configuration + .config("spark.sql.shuffle.partitions", "4").config("spark.ui.enabled", "false").enableHiveSupport() + .getOrCreate(); + + // Set log level to WARN to reduce noise + spark.sparkContext().setLogLevel("WARN"); + } + + @AfterClass + public void tearDownSpark() throws Exception { + // Stop Spark session first + if (spark != null) { + spark.stop(); + spark = null; + } + + // Give Spark time to release resources + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Stop Iceberg's TestHiveMetastore + if (icebergTestHms != null) { + icebergTestHms.stop(); + icebergTestHms = null; + } + + // Cleanup Derby-based HMS (from parent class) + super.tearDownHiveMetastore(); + } + + /** + * Get the SparkSession for this test. + * + * @return SparkSession instance + */ + protected SparkSession getSparkSession() { + return spark; + } + + /** + * Execute a SQL query and return the count of results. + * + * @param sql SQL query to execute + * @return Count of results + */ + protected long executeSqlAndGetCount(String sql) { + return spark.sql(sql).count(); + } + + /** + * Execute a SQL query. + * For DDL statements, we need to collect() to force execution. + * + * @param sql SQL query to execute + */ + protected void executeSql(String sql) { + spark.sql(sql).collect(); + } + + /** + * Create a HiveMetastoreClient wrapper for Coral that delegates to both HMS clients. + * This wrapper checks both the Derby-based HMS (rawHmsClient) and Iceberg's TestHiveMetastore. + * + * @return HiveMetastoreClient for Coral + */ + protected HiveMetastoreClient createCoralHiveMetastoreClient() throws Exception { + HiveMetaStoreClient rawHmsClient = new HiveMetaStoreClient(hiveConf); + HiveMetaStoreClient icebergHmsClient = new HiveMetaStoreClient(icebergHiveConf); + + return new HiveMetastoreClient() { + @Override + public List getAllDatabases() { + try { + return rawHmsClient.getAllDatabases(); + } catch (Exception e) { + throw new RuntimeException("Failed to retrieve all databases", e); + } + } + + @Override + public Database getDatabase(String dbName) { + try { + return rawHmsClient.getDatabase(dbName); + } catch (Exception e) { + throw new RuntimeException("Failed to get database: " + dbName, e); + } + } + + @Override + public List getAllTables(String dbName) { + Set allTables = new HashSet<>(); + + // Get tables from rawHmsClient (Derby-based HMS for Hive tables) + try { + List rawTables = rawHmsClient.getAllTables(dbName); + if (rawTables != null) { + allTables.addAll(rawTables); + } + } catch (Exception e) { + // Ignore if database doesn't exist in rawHmsClient + } + + // Get tables from icebergHmsClient (TestHiveMetastore for Iceberg tables) + try { + List icebergTables = icebergHmsClient.getAllTables(dbName); + if (icebergTables != null) { + allTables.addAll(icebergTables); + } + } catch (Exception e) { + // Ignore if database doesn't exist in icebergHmsClient + } + + return new ArrayList<>(allTables); + } + + @Override + public org.apache.hadoop.hive.metastore.api.Table getTable(String dbName, String tableName) { + try { + // Try rawHmsClient first (Derby-based HMS for Hive tables) + return rawHmsClient.getTable(dbName, tableName); + } catch (Exception e) { + // Table not found in rawHmsClient, try icebergHmsClient + } + + try { + // Try icebergHmsClient (TestHiveMetastore for Iceberg tables) + return icebergHmsClient.getTable(dbName, tableName); + } catch (Exception e) { + throw new RuntimeException("Failed to get table from both HMS instances: " + dbName + "." + tableName, e); + } + } + }; + } + + /** + * Validate that the translated Spark SQL can be parsed by Spark's SQL parser. + * + * @param spark SparkSession instance + * @param coralSparkTranslation CoralSpark translation object + */ + protected boolean validateSparkSql(SparkSession spark, CoralSpark coralSparkTranslation) { + String sql = coralSparkTranslation.getSparkSql(); + try { + spark.sessionState().sqlParser().parsePlan(sql); + return true; + } catch (ParseException e) { + throw new RuntimeException("Validation failed, failed to parse the translated spark sql: ", e); + } + } + + /** + * Validate that the translated Trino SQL can be parsed by Trino's SQL parser. + * + * @param trinoSql Trino SQL string to validate + * @return true if the SQL is valid and can be parsed + */ + protected boolean validateTrinoSql(String trinoSql) { + try { + Statement parsedStatement = TrinoParserDriver.parse(trinoSql); + return parsedStatement != null; + } catch (Exception e) { + throw new RuntimeException("Validation failed, failed to parse the translated Trino SQL: " + trinoSql, e); + } + } + + /** + * Get Coral Spark translation for a given view. + * + * @param db Database name + * @param table Table/view name + * @param hiveMetastoreClient HiveMetastoreClient for Coral + * @return CoralSpark translation object + */ + protected CoralSpark getCoralSparkTranslation(String db, String table, HiveMetastoreClient hiveMetastoreClient) { + final HiveToRelConverter hiveToRelConverter = new HiveToRelConverter(hiveMetastoreClient); + final RelNode rel = hiveToRelConverter.convertView(db, table); + Schema coralSchema = ViewToAvroSchemaConverter.create(hiveMetastoreClient).toAvroSchema(db, table); + return CoralSpark.create(rel, coralSchema, hiveMetastoreClient); + } +} diff --git a/coral-integration-test/src/test/java/com/linkedin/coral/integration/CoralInteropIntegrationTest.java b/coral-integration-test/src/test/java/com/linkedin/coral/integration/CoralInteropIntegrationTest.java new file mode 100644 index 000000000..b03c7fa99 --- /dev/null +++ b/coral-integration-test/src/test/java/com/linkedin/coral/integration/CoralInteropIntegrationTest.java @@ -0,0 +1,94 @@ +/** + * Copyright 2019-2025 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.coral.integration; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.testng.annotations.Test; + +import com.linkedin.coral.common.HiveMetastoreClient; +import com.linkedin.coral.hive.hive2rel.HiveToRelConverter; +import com.linkedin.coral.spark.CoralSpark; +import com.linkedin.coral.trino.rel2trino.HiveToTrinoConverter; + +import static org.testng.Assert.*; + + +/** + * Sample integration test demonstrating Coral interoperability with Spark, Trino, Iceberg, and Hive Tables/Views. + */ +public class CoralInteropIntegrationTest extends CoralIntegrationTestBase { + + @Test + public void testCreateHiveViewOnIcebergTable() throws Exception { + // Create an Iceberg table using fully qualified name + executeSql("CREATE TABLE IF NOT EXISTS iceberg_catalog.default.test_iceberg_table " + + "(id BIGINT, name STRING, age INT, salary DOUBLE, hire_date TIMESTAMP) " + "USING iceberg"); + + // Insert test data into the Iceberg table + executeSql("INSERT INTO iceberg_catalog.default.test_iceberg_table " + + "SELECT 1L, 'Alice', 30, 75000.0, current_timestamp() UNION ALL " + + "SELECT 2L, 'Bob', 25, 65000.0, current_timestamp() UNION ALL " + + "SELECT 3L, 'Charlie', 35, 85000.0, current_timestamp()"); + + // Create a Hive view on top of the Iceberg table + // The view filters employees with age > 25 and selects specific columns including timestamp + executeSql("USE iceberg_catalog"); + executeSql("CREATE OR REPLACE VIEW spark_catalog.default.iceberg_table_view AS " + + "SELECT id, name, age, hire_date FROM default.test_iceberg_table WHERE age > 25"); + executeSql("USE spark_catalog"); + + // Query the Hive view + Dataset viewResult = spark.sql("SELECT * FROM spark_catalog.default.iceberg_table_view"); + long viewCount = viewResult.count(); + + // Verify the view returns the expected number of rows (2 employees with age > 25) + assertEquals(viewCount, 2, "View should return 2 rows with age > 25"); + + // Verify we can filter on the view + Dataset filteredView = spark.sql("SELECT name FROM spark_catalog.default.iceberg_table_view WHERE age >= 30"); + assertEquals(filteredView.count(), 2, "Should have 2 employees with age >= 30"); + + // Test Coral Spark translation + String db = "default"; + String table = "iceberg_table_view"; + + HiveMetastoreClient hiveMetastoreClient = createCoralHiveMetastoreClient(); + + // Test Spark translation and validation + CoralSpark coralSparkTranslation = getCoralSparkTranslation(db, table, hiveMetastoreClient); + assertTrue(validateSparkSql(spark, coralSparkTranslation)); + + // Test Trino translation and validation + // Ideally we run this against a trino server in unit test, like we did for Spark. + // But trino testcontainers require a local docker daemon to spin up which may not be available in all environments. + HiveToTrinoConverter hiveToTrinoConverter = HiveToTrinoConverter.create(hiveMetastoreClient); + String trinoSql = hiveToTrinoConverter.toTrinoSql(db, table); + assertNotNull(trinoSql, "Trino SQL translation should not be null"); + assertTrue(validateTrinoSql(trinoSql), "Trino SQL validation should succeed"); + + RelNode relNode = getRelNode(db, table, hiveMetastoreClient); + assertNotNull(relNode, "RelNode conversion should not be null"); + RelDataType timestampField = relNode.getRowType().getFieldList().stream() + .filter(field -> field.getName().equals("hire_date")).map(field -> field.getType()).findFirst().orElse(null); + + assertNotNull(timestampField, "hire_date field should exist in RelNode"); + assertEquals(timestampField.getSqlTypeName(), SqlTypeName.TIMESTAMP, "hire_date field should be of TIMESTAMP type"); + assertEquals(timestampField.getPrecision(), -1, + "TIMESTAMP field should have precision 6 (microsecond precision) when bug is fixed"); + + // Drop the view after test + executeSql("DROP VIEW IF EXISTS spark_catalog.default.iceberg_table_view"); + executeSql("DROP TABLE IF EXISTS iceberg_catalog.default.test_iceberg_table"); + } + + private RelNode getRelNode(String db, String view, HiveMetastoreClient hiveMetastoreClient) { + return new HiveToRelConverter(hiveMetastoreClient).convertView(db, view); + } +} diff --git a/coral-integration-test/src/test/java/com/linkedin/coral/integration/HiveMetastoreTestBase.java b/coral-integration-test/src/test/java/com/linkedin/coral/integration/HiveMetastoreTestBase.java new file mode 100644 index 000000000..e1bf23fe9 --- /dev/null +++ b/coral-integration-test/src/test/java/com/linkedin/coral/integration/HiveMetastoreTestBase.java @@ -0,0 +1,270 @@ +/** + * Copyright 2019-2025 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.coral.integration; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.RetryingHMSHandler; +import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TTransportFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + + +/** + * Abstract base class for integration tests requiring a standalone Hive Metastore + * backed by a single embedded Derby database for Hive tables only. + */ +public class HiveMetastoreTestBase { + + protected HiveMetaStoreClient metastoreClient; + protected HiveConf hiveConf; + protected Path warehouseDir; + protected Path metastoreDbDir; + protected String hiveMetastoreUri; + private int hiveMetastorePort = 9085; + + // Thrift server components for in-memory HMS + private TServer hiveMetastoreServer; + private ExecutorService hiveExecutorService; + private HiveMetaStore.HMSHandler hiveBaseHandler; + + @BeforeClass + public void setupHiveMetastore() throws Exception { + // Create temporary directories for Hive catalog + warehouseDir = Files.createTempDirectory("hive-warehouse"); + metastoreDbDir = Files.createTempDirectory("hive-metastore-db"); + + // Create HiveConf for Hive catalog + hiveConf = createHiveConf("hive", metastoreDbDir, warehouseDir); + + // Start in-memory HiveMetaStore service (use port 0 for auto-assignment) + hiveMetastorePort = startInMemoryMetastoreService(hiveConf); + + // Set the actual URI after server is started (with dynamic port) + hiveMetastoreUri = "thrift://localhost:" + hiveMetastorePort; + + // Update config with actual URI + hiveConf.set("hive.metastore.uris", hiveMetastoreUri); + + // Only create metastore client if not overridden by subclass + if (shouldCreateMetastoreClient()) { + createHiveMetastoreClient(); + } + } + + /** + * Start an in-memory HiveMetaStore Thrift service. + * This starts a real HMS service listening on a dynamically assigned port, running within the test JVM. + * + * @param conf HiveConf for this metastore + * @return the actual port number that was assigned + */ + private int startInMemoryMetastoreService(HiveConf conf) throws Exception { + // Create server socket with port 0 (auto-assign available port) + TServerSocket socket = new TServerSocket(0); + int port = socket.getServerSocket().getLocalPort(); + + // Set metastore URI in config + conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port); + conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false"); + + // Create HMS handler + hiveBaseHandler = new HiveMetaStore.HMSHandler("metastore", conf); + IHMSHandler handler = RetryingHMSHandler.getProxy(conf, hiveBaseHandler, false); + + // Create Thrift server + TThreadPoolServer.Args args = new TThreadPoolServer.Args(socket).processor(new TSetIpAddressProcessor<>(handler)) + .transportFactory(new TTransportFactory()).protocolFactory(new TBinaryProtocol.Factory()).minWorkerThreads(2) + .maxWorkerThreads(2); + + hiveMetastoreServer = new TThreadPoolServer(args); + + // Start server in background thread + hiveExecutorService = Executors.newSingleThreadExecutor(); + hiveExecutorService.submit(() -> hiveMetastoreServer.serve()); + + // Wait for the service to start + Thread.sleep(2000); + + System.out.println("Started in-memory HiveMetaStore on port " + port + " for Hive catalog"); + + // Create default database using the started service + createDefaultDatabase(conf); + + return port; + } + + /** + * Create default database in the metastore + */ + private void createDefaultDatabase(HiveConf conf) { + try { + // Create a temporary client just for creating the default database + HiveMetaStoreClient tempClient = new HiveMetaStoreClient(conf); + try { + // Check if default database already exists + tempClient.getDatabase("default"); + } catch (Exception e) { + // Default database doesn't exist, create it + String warehousePath = conf.get("hive.metastore.warehouse.dir"); + Database defaultDb = new Database("default", "Default Hive database", warehousePath, null); + tempClient.createDatabase(defaultDb); + System.out.println("Created default database for Hive catalog"); + } finally { + tempClient.close(); + } + } catch (Exception e) { + System.err.println("Failed to create default database for Hive catalog: " + e.getMessage()); + e.printStackTrace(); + } + } + + /** + * Create a HiveConf with embedded metastore configuration. + * + * @param catalogName Name identifier for this catalog (e.g., "hive") + * @param metastoreDir Directory for Derby metastore database + * @param warehouseDir Directory for warehouse + * @return Configured HiveConf instance + */ + protected HiveConf createHiveConf(String catalogName, Path metastoreDir, Path warehouseDir) { + HiveConf conf = new HiveConf(); + + // Each catalog gets its own Derby database (this ensures complete isolation) + conf.set("javax.jdo.option.ConnectionURL", + "jdbc:derby:;databaseName=" + metastoreDir.toAbsolutePath().toString() + "/metastore_db;create=true"); + conf.set("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver"); + + // Set warehouse directory for this catalog + conf.set("hive.metastore.warehouse.dir", warehouseDir.toAbsolutePath().toUri().toString()); + + // Derby/DataNucleus configuration + conf.set("hive.metastore.schema.verification", "false"); + conf.set("datanucleus.schema.autoCreateAll", "true"); + conf.set("hive.metastore.schema.verification.record.version", "false"); + + // Add catalog identifier for logging/debugging + conf.set("hive.metastore.client.identifier", catalogName); + + return conf; + } + + /** + * Override this method to prevent metastore client creation in subclasses + * that use their own metastore access (e.g., through Spark) + */ + protected boolean shouldCreateMetastoreClient() { + return true; + } + + /** + * Create the HiveMetaStoreClient for the Hive catalog. + */ + protected void createHiveMetastoreClient() throws TException { + try { + metastoreClient = new HiveMetaStoreClient(hiveConf); + } catch (MetaException e) { + throw new RuntimeException("Failed to create HiveMetaStoreClient for Hive catalog", e); + } + } + + @AfterClass + public void tearDownHiveMetastore() throws IOException { + // Close Hive metastore client + if (metastoreClient != null) { + try { + metastoreClient.close(); + } catch (Exception e) { + System.err.println("Error closing Hive metastore client: " + e.getMessage()); + } + metastoreClient = null; + } + + // Stop Hive Thrift server + if (hiveMetastoreServer != null) { + hiveMetastoreServer.stop(); + } + if (hiveExecutorService != null) { + hiveExecutorService.shutdown(); + } + if (hiveBaseHandler != null) { + try { + hiveBaseHandler.shutdown(); + } catch (Exception e) { + System.err.println("Error shutting down Hive base handler: " + e.getMessage()); + } + } + + // Shutdown Derby database system-wide + try { + java.sql.DriverManager.getConnection("jdbc:derby:;shutdown=true"); + } catch (java.sql.SQLException e) { + // Derby throws SQLException on successful shutdown + // SQLState 08006 or XJ015 indicates successful shutdown + if (e.getSQLState() != null && (e.getSQLState().equals("XJ015") || e.getSQLState().equals("08006"))) { + // Expected - Derby shut down successfully + } else { + System.err.println("Derby shutdown warning (SQLState " + e.getSQLState() + "): " + e.getMessage()); + } + } + + // Wait for Derby to fully release locks + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Clean up temporary directories + if (warehouseDir != null && Files.exists(warehouseDir)) { + try { + Files.walk(warehouseDir).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + } catch (Exception e) { + System.err.println("Error deleting warehouse dir: " + e.getMessage()); + } + } + + if (metastoreDbDir != null && Files.exists(metastoreDbDir)) { + try { + Files.walk(metastoreDbDir).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + } catch (Exception e) { + System.err.println("Error deleting metastore DB dir: " + e.getMessage()); + } + } + } + + /** + * Get the warehouse directory path + */ + protected String getWarehouseDir() { + return warehouseDir.toAbsolutePath().toUri().toString(); + } + + /** + * Get the Hive metastore URI + */ + protected String getHiveMetastoreUri() { + return hiveMetastoreUri; + } +} diff --git a/coral-integration-test/src/test/resources/log4j.properties b/coral-integration-test/src/test/resources/log4j.properties new file mode 100644 index 000000000..d898b7d95 --- /dev/null +++ b/coral-integration-test/src/test/resources/log4j.properties @@ -0,0 +1,23 @@ +# Root logger option +log4j.rootLogger=WARN, console + +# Direct log messages to console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +# Reduce Spark logging +log4j.logger.org.apache.spark=WARN +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.apache.hive=WARN +log4j.logger.org.apache.iceberg=WARN + +# Reduce DataNucleus logging +log4j.logger.DataNucleus=ERROR +log4j.logger.Datastore=ERROR +log4j.logger.Datastore.Schema=ERROR + +# Reduce Derby logging +log4j.logger.org.apache.derby=ERROR + diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index f6e70d1f4..a8f6d3b0c 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -5,6 +5,8 @@ def versions = [ 'gson': '2.9.0', 'hadoop': '2.7.0', 'hive': '1.2.2', + 'hive2': '2.0.1', + 'iceberg': '1.3.1', 'ivy': '2.5.1', 'jetbrains': '16.0.2', 'jline': '0.9.94', @@ -31,6 +33,16 @@ ext.deps = [ 'hive-metastore': "org.apache.hive:hive-metastore:${versions['hive']}", 'hive-exec-core': "org.apache.hive:hive-exec:${versions['hive']}:core" ], + 'hive2':[ + 'hive-metastore': "org.apache.hive:hive-metastore:${versions['hive2']}", + 'hive-exec-core': "org.apache.hive:hive-exec:${versions['hive2']}:core", + 'hive-standalone-metastore': "org.apache.hive:hive-standalone-metastore:${versions['hive2']}" + ], + 'iceberg': [ + 'iceberg-core': "org.apache.iceberg:iceberg-core:${versions['iceberg']}", + 'iceberg-spark': "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:${versions['iceberg']}", + 'iceberg-hive-metastore': "org.apache.iceberg:iceberg-hive-metastore:${versions['iceberg']}" + ], 'ivy': "org.apache.ivy:ivy:${versions['ivy']}", 'jetbrains': "org.jetbrains:annotations:${versions['jetbrains']}", 'jline': "jline:jline:${versions['jline']}", diff --git a/settings.gradle b/settings.gradle index cfe75ca1b..0711726f0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -3,6 +3,7 @@ def modules = [ 'coral-dbt', 'coral-hive', 'coral-incremental', + 'coral-integration-test', 'coral-pig', 'coral-spark', 'coral-schema',