Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions coral-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,12 @@ dependencies {
}

compile deps.'hadoop'.'hadoop-common'

// LinkedIn Iceberg dependencies
compile deps.'linkedin-iceberg'.'iceberg-api'
compile deps.'linkedin-iceberg'.'iceberg-core'
compile(deps.'linkedin-iceberg'.'iceberg-hive-metastore') {
exclude group: 'org.apache.hive', module: 'hive-metastore'
exclude group: 'org.apache.hadoop', module: 'hadoop-common'
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2022 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -17,46 +17,67 @@
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.*;

import com.linkedin.coral.common.catalog.CoralCatalog;
import com.linkedin.coral.common.catalog.Dataset;
import com.linkedin.coral.common.catalog.HiveDataset;
import com.linkedin.coral.common.catalog.IcebergDataset;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.linkedin.coral.common.catalog.TableType.VIEW;


/**
* Adaptor from Hive catalog providing database and table names
* to Calcite {@link Schema}
* Adaptor from catalog providing database and table names to Calcite {@link Schema}.
* Uses CoralCatalog to provide unified access to different table formats
* (Hive, Iceberg, etc.).
*/
public class HiveDbSchema implements Schema {

public static final String DEFAULT_DB = "default";

private final HiveMetastoreClient msc;
private final CoralCatalog catalog;
private final String dbName;

HiveDbSchema(@Nonnull CoralCatalog catalog, @Nonnull String dbName) {
this.catalog = checkNotNull(catalog);
this.dbName = checkNotNull(dbName);
}

/**
* Constructor for backward compatibility with HiveMetastoreClient.
*/
HiveDbSchema(@Nonnull HiveMetastoreClient msc, @Nonnull String dbName) {
checkNotNull(msc);
checkNotNull(dbName);
this.msc = msc;
this.dbName = dbName;
this((CoralCatalog) checkNotNull(msc), checkNotNull(dbName));
}

@Override
public Table getTable(String name) {
org.apache.hadoop.hive.metastore.api.Table table = msc.getTable(dbName, name);
if (table == null) {
// Get unified Dataset from CoralCatalog
Dataset dataset = catalog.getDataset(dbName, name);
if (dataset == null) {
return null;
}
org.apache.hadoop.hive.metastore.TableType tableType =
Enum.valueOf(org.apache.hadoop.hive.metastore.TableType.class, table.getTableType());
switch (tableType) {
case VIRTUAL_VIEW:
return new HiveViewTable(table, ImmutableList.of(HiveSchema.ROOT_SCHEMA, dbName));
default:
return new HiveTable(table);

// Dispatch based on Dataset implementation type
if (dataset instanceof IcebergDataset) {
return new IcebergTable((IcebergDataset) dataset);
} else if (dataset instanceof HiveDataset) {
HiveDataset hiveDataset = (HiveDataset) dataset;
// Check if it's a view
if (hiveDataset.tableType() == VIEW) {
return new HiveViewTable(hiveDataset, ImmutableList.of(HiveSchema.ROOT_SCHEMA, dbName));
} else {
return new HiveTable(hiveDataset);
}
}

// Unknown dataset type - return null
return null;
}

@Override
public Set<String> getTableNames() {
return ImmutableSet.copyOf(msc.getAllTables(dbName));
return ImmutableSet.copyOf(catalog.getAllDatasets(dbName));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2022 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -10,14 +10,97 @@
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;

import com.linkedin.coral.common.catalog.CoralCatalog;
import com.linkedin.coral.common.catalog.Dataset;
import com.linkedin.coral.common.catalog.DatasetConverter;

public interface HiveMetastoreClient {

/**
* Interface for accessing Hive Metastore.
* This interface extends {@link CoralCatalog} to provide unified Dataset access
* while maintaining backward compatibility with existing Hive-specific methods.
*
* Implementations of this interface handle connections to Hive metastore
* and provide access to database and table metadata.
*/
public interface HiveMetastoreClient extends CoralCatalog {

/**
* Retrieves all database names from the metastore.
*
* @return List of database names
*/
List<String> getAllDatabases();

/**
* Retrieves database metadata by name.
*
* @param dbName Database name
* @return Database object, or null if not found
*/
Database getDatabase(String dbName);

/**
* Retrieves all table names in a database.
*
* @param dbName Database name
* @return List of table names
*/
List<String> getAllTables(String dbName);

/**
* Retrieves a table by database and table name.
*
* @param dbName Database name
* @param tableName Table name
* @return Hive Table object, or null if not found
*/
Table getTable(String dbName, String tableName);

/**
* Checks if a namespace (database) exists in the metastore.
* Default implementation delegates to {@link #getDatabase(String)}.
*
* @param dbName Database name
* @return true if database exists, false otherwise
*/
@Override
default boolean namespaceExists(String dbName) {
Database db = getDatabase(dbName);
return db != null;
}

/**
* Retrieves a dataset by database and table name.
* This method provides unified access to tables through the Dataset abstraction.
*
* Default implementation uses {@link #getTable(String, String)} and
* {@link DatasetConverter#autoConvert(Table)} to provide Dataset access.
*
* @param dbName Database name
* @param tableName Table name
* @return Dataset object, or null if table not found
*/
@Override
default Dataset getDataset(String dbName, String tableName) {
Table table = getTable(dbName, tableName);
if (table == null) {
return null;
}
return DatasetConverter.autoConvert(table);
}

/**
* Retrieves all dataset names in a database.
* Default implementation delegates to {@link #getAllTables(String)}.
*
* @param dbName Database name
* @return List of dataset (table) names
*/
@Override
default List<String> getAllDatasets(String dbName) {
return getAllTables(dbName);
}

// Note: getAllDatabases() already satisfies CoralCatalog.getAllDatabases()
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2022 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -17,7 +17,8 @@
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.*;
import org.apache.hadoop.hive.metastore.api.Database;

import com.linkedin.coral.common.catalog.CoralCatalog;

import static com.google.common.base.Preconditions.checkNotNull;

Expand All @@ -26,20 +27,33 @@
* Adaptor from Hive catalog providing database and table names
* to Calcite {@link Schema}. This class represents the "root" schema
* that holds all hive databases as subschema and no tables.
*
* Uses CoralCatalog for unified access to different table formats
* (Hive, Iceberg, etc.).
*/
public class HiveSchema implements Schema {

public static final String ROOT_SCHEMA = "hive";
public static final String DEFAULT_DB = "default";

private final HiveMetastoreClient msc;
private final CoralCatalog catalog;

/**
* Create HiveSchema using CoralCatalog to read catalog information.
*
* @param catalog Coral catalog providing unified access to tables
*/
public HiveSchema(@Nonnull CoralCatalog catalog) {
this.catalog = checkNotNull(catalog);
}

/**
* Create HiveSchema using input metastore client to read hive catalog
* Create HiveSchema using HiveMetastoreClient (backward compatibility).
*
* @param msc Hive metastore client
*/
public HiveSchema(@Nonnull HiveMetastoreClient msc) {
this.msc = checkNotNull(msc);
this((CoralCatalog) checkNotNull(msc));
}

/**
Expand Down Expand Up @@ -79,13 +93,16 @@ public Set<String> getFunctionNames() {

@Override
public Schema getSubSchema(String name) {
Database database = msc.getDatabase(name);
return (database == null) ? null : new HiveDbSchema(msc, database.getName());
// Check if namespace (database) exists using the catalog API
if (!catalog.namespaceExists(name)) {
return null;
}
return new HiveDbSchema(catalog, name);
}

@Override
public Set<String> getSubSchemaNames() {
List<String> dbNames = msc.getAllDatabases();
List<String> dbNames = catalog.getAllDatabases();
return ImmutableSet.copyOf(dbNames);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand Down Expand Up @@ -39,6 +39,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linkedin.coral.common.catalog.HiveDataset;


/**
* Adaptor class from Hive {@link org.apache.hadoop.hive.metastore.api.Table} representation to
Expand Down Expand Up @@ -88,6 +90,15 @@ public HiveTable(org.apache.hadoop.hive.metastore.api.Table hiveTable) {
this.hiveTable = hiveTable;
}

/**
* Constructor accepting HiveDataset for unified catalog integration.
* @param dataset HiveDataset from catalog
*/
public HiveTable(HiveDataset dataset) {
Preconditions.checkNotNull(dataset);
this.hiveTable = dataset.getHiveTable();
}

/**
* Get dali function params from table TBLPROPERTIES clause parameters.
* The 'functions' parameter in TBLPROPERTIES clause is a whitespace-separated list of function base name
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -15,6 +15,7 @@

import com.linkedin.coral.com.google.common.base.Throwables;
import com.linkedin.coral.com.google.common.collect.ImmutableList;
import com.linkedin.coral.common.catalog.HiveDataset;

import static org.apache.calcite.sql.type.SqlTypeName.*;

Expand All @@ -37,6 +38,17 @@ public HiveViewTable(Table hiveTable, List<String> schemaPath) {
this.schemaPath = schemaPath;
}

/**
* Constructor accepting HiveDataset for unified catalog integration.
*
* @param dataset HiveDataset from catalog
* @param schemaPath Calcite schema path
*/
public HiveViewTable(HiveDataset dataset, List<String> schemaPath) {
super(dataset);
this.schemaPath = schemaPath;
}

@Override
public RelNode toRel(RelOptTable.ToRelContext relContext, RelOptTable relOptTable) {
try {
Expand Down
Loading
Loading