Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions coral-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,12 @@ dependencies {
}

compile deps.'hadoop'.'hadoop-common'

// LinkedIn Iceberg dependencies
compile deps.'linkedin-iceberg'.'iceberg-api'
compile deps.'linkedin-iceberg'.'iceberg-core'
compile(deps.'linkedin-iceberg'.'iceberg-hive-metastore') {
exclude group: 'org.apache.hive', module: 'hive-metastore'
exclude group: 'org.apache.hadoop', module: 'hadoop-common'
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2022 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -17,46 +17,86 @@
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.*;

import com.linkedin.coral.common.catalog.CoralCatalog;
import com.linkedin.coral.common.catalog.CoralTable;
import com.linkedin.coral.common.catalog.HiveCoralTable;
import com.linkedin.coral.common.catalog.IcebergCoralTable;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.linkedin.coral.common.catalog.TableType.VIEW;


/**
* Adaptor from Hive catalog providing database and table names
* to Calcite {@link Schema}
* Adaptor from catalog providing database and table names to Calcite {@link Schema}.
* Can use either CoralCatalog for unified access or HiveMetastoreClient for Hive-specific access.
*/
public class HiveDbSchema implements Schema {

public static final String DEFAULT_DB = "default";

private final CoralCatalog coralCatalog;
private final HiveMetastoreClient msc;
private final String dbName;

HiveDbSchema(@Nonnull HiveMetastoreClient msc, @Nonnull String dbName) {
checkNotNull(msc);
checkNotNull(dbName);
/**
* Constructor for HiveDbSchema. Exactly one of coralCatalog or msc must be non-null.
*
* @param coralCatalog Coral catalog for unified access (can be null if msc is provided)
* @param msc Hive metastore client for Hive-specific access (can be null if coralCatalog is provided)
* @param dbName Database name (must not be null)
*/
HiveDbSchema(CoralCatalog coralCatalog, HiveMetastoreClient msc, @Nonnull String dbName) {
this.coralCatalog = coralCatalog;
this.msc = msc;
this.dbName = dbName;
this.dbName = checkNotNull(dbName);
}

@Override
public Table getTable(String name) {
org.apache.hadoop.hive.metastore.api.Table table = msc.getTable(dbName, name);
if (table == null) {
if (coralCatalog != null) {
// Use CoralCatalog for unified table access
CoralTable coralTable = coralCatalog.getTable(dbName, name);
if (coralTable == null) {
return null;
}

// Dispatch based on CoralTable implementation type
if (coralTable instanceof IcebergCoralTable) {
return new IcebergTable((IcebergCoralTable) coralTable);
} else if (coralTable instanceof HiveCoralTable) {
HiveCoralTable hiveCoralTable = (HiveCoralTable) coralTable;
// Check if it's a view
if (hiveCoralTable.tableType() == VIEW) {
return new HiveViewTable(hiveCoralTable, ImmutableList.of(HiveSchema.ROOT_SCHEMA, dbName));
} else {
return new HiveTable(hiveCoralTable);
}
}
return null;
}
org.apache.hadoop.hive.metastore.TableType tableType =
Enum.valueOf(org.apache.hadoop.hive.metastore.TableType.class, table.getTableType());
switch (tableType) {
case VIRTUAL_VIEW:
return new HiveViewTable(table, ImmutableList.of(HiveSchema.ROOT_SCHEMA, dbName));
default:
return new HiveTable(table);
} else {
// Use HiveMetastoreClient for Hive-specific access
org.apache.hadoop.hive.metastore.api.Table hiveTable = msc.getTable(dbName, name);
if (hiveTable == null) {
return null;
}

// Wrap in HiveCoralTable and dispatch
HiveCoralTable hiveCoralTable = new HiveCoralTable(hiveTable);
if (hiveCoralTable.tableType() == VIEW) {
return new HiveViewTable(hiveCoralTable, ImmutableList.of(HiveSchema.ROOT_SCHEMA, dbName));
} else {
return new HiveTable(hiveCoralTable);
}
}
}

@Override
public Set<String> getTableNames() {
return ImmutableSet.copyOf(msc.getAllTables(dbName));
if (coralCatalog != null) {
return ImmutableSet.copyOf(coralCatalog.getAllTables(dbName));
} else {
return ImmutableSet.copyOf(msc.getAllTables(dbName));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2022 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -11,13 +11,48 @@
import org.apache.hadoop.hive.metastore.api.Table;


/**
* Interface for accessing Hive Metastore.
* Implementations of this interface handle connections to Hive metastore
* and provide access to database and table metadata.
*
* @deprecated Use {@link com.linkedin.coral.common.catalog.CoralCatalog} instead.
* CoralCatalog provides a unified interface supporting multiple table formats
* (Hive, Iceberg, etc.) while this interface is Hive-specific.
* Existing code using HiveMetastoreClient continues to work.
*/
@Deprecated
public interface HiveMetastoreClient {

/**
* Retrieves all database names from the metastore.
*
* @return List of database names
*/
List<String> getAllDatabases();

/**
* Retrieves database metadata by name.
*
* @param dbName Database name
* @return Database object, or null if not found
*/
Database getDatabase(String dbName);

/**
* Retrieves all table names in a database.
*
* @param dbName Database name
* @return List of table names
*/
List<String> getAllTables(String dbName);

/**
* Retrieves a table by database and table name.
*
* @param dbName Database name
* @param tableName Table name
* @return Hive Table object, or null if not found
*/
Table getTable(String dbName, String tableName);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -17,6 +17,15 @@
import org.slf4j.LoggerFactory;


/**
* Adapter implementation of {@link HiveMetastoreClient} that wraps
* Hadoop's {@link IMetaStoreClient}.
*
* @deprecated Use {@link com.linkedin.coral.common.catalog.CoralCatalog} instead.
* This class is Hive-specific. For multi-format support (Hive, Iceberg),
* implement CoralCatalog directly. Existing code continues to work.
*/
@Deprecated
public class HiveMscAdapter implements HiveMetastoreClient {

private final static Logger LOG = LoggerFactory.getLogger(HiveMscAdapter.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
/**
* Copyright 2017-2022 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.common;

import java.util.Collection;
import java.util.List;
import java.util.Set;

import javax.annotation.Nonnull;
Expand All @@ -17,7 +16,8 @@
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.*;
import org.apache.hadoop.hive.metastore.api.Database;

import com.linkedin.coral.common.catalog.CoralCatalog;

import static com.google.common.base.Preconditions.checkNotNull;

Expand All @@ -26,20 +26,36 @@
* Adaptor from Hive catalog providing database and table names
* to Calcite {@link Schema}. This class represents the "root" schema
* that holds all hive databases as subschema and no tables.
*
* Can use either CoralCatalog for unified access to different table formats
* or HiveMetastoreClient for Hive-specific access.
*/
public class HiveSchema implements Schema {

public static final String ROOT_SCHEMA = "hive";
public static final String DEFAULT_DB = "default";

private final CoralCatalog coralCatalog;
private final HiveMetastoreClient msc;

/**
* Create HiveSchema using input metastore client to read hive catalog
* Create HiveSchema using CoralCatalog to read catalog information.
*
* @param coralCatalog Coral catalog providing unified access to tables
*/
public HiveSchema(@Nonnull CoralCatalog coralCatalog) {
this.coralCatalog = checkNotNull(coralCatalog);
this.msc = null;
}

/**
* Create HiveSchema using HiveMetastoreClient (backward compatibility).
*
* @param msc Hive metastore client
*/
public HiveSchema(@Nonnull HiveMetastoreClient msc) {
this.msc = checkNotNull(msc);
this.coralCatalog = null;
}

/**
Expand Down Expand Up @@ -79,14 +95,27 @@ public Set<String> getFunctionNames() {

@Override
public Schema getSubSchema(String name) {
Database database = msc.getDatabase(name);
return (database == null) ? null : new HiveDbSchema(msc, database.getName());
// Check if database exists
if (coralCatalog != null) {
if (!coralCatalog.namespaceExists(name)) {
return null;
}
return new HiveDbSchema(coralCatalog, null, name);
} else {
if (msc.getDatabase(name) == null) {
return null;
}
return new HiveDbSchema(null, msc, name);
}
}

@Override
public Set<String> getSubSchemaNames() {
List<String> dbNames = msc.getAllDatabases();
return ImmutableSet.copyOf(dbNames);
if (coralCatalog != null) {
return ImmutableSet.copyOf(coralCatalog.getAllNamespaces());
} else {
return ImmutableSet.copyOf(msc.getAllDatabases());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand Down Expand Up @@ -39,6 +39,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linkedin.coral.common.catalog.HiveCoralTable;


/**
* Adaptor class from Hive {@link org.apache.hadoop.hive.metastore.api.Table} representation to
Expand Down Expand Up @@ -88,6 +90,15 @@ public HiveTable(org.apache.hadoop.hive.metastore.api.Table hiveTable) {
this.hiveTable = hiveTable;
}

/**
* Constructor accepting HiveCoralTable for unified catalog integration.
* @param coralTable HiveCoralTable from catalog
*/
public HiveTable(HiveCoralTable coralTable) {
Preconditions.checkNotNull(coralTable);
this.hiveTable = coralTable.getHiveTable();
}

/**
* Get dali function params from table TBLPROPERTIES clause parameters.
* The 'functions' parameter in TBLPROPERTIES clause is a whitespace-separated list of function base name
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -15,6 +15,7 @@

import com.linkedin.coral.com.google.common.base.Throwables;
import com.linkedin.coral.com.google.common.collect.ImmutableList;
import com.linkedin.coral.common.catalog.HiveCoralTable;

import static org.apache.calcite.sql.type.SqlTypeName.*;

Expand All @@ -37,6 +38,17 @@ public HiveViewTable(Table hiveTable, List<String> schemaPath) {
this.schemaPath = schemaPath;
}

/**
* Constructor accepting HiveCoralTable for unified catalog integration.
*
* @param coralTable HiveCoralTable from catalog
* @param schemaPath Calcite schema path
*/
public HiveViewTable(HiveCoralTable coralTable, List<String> schemaPath) {
super(coralTable);
this.schemaPath = schemaPath;
}

@Override
public RelNode toRel(RelOptTable.ToRelContext relContext, RelOptTable relOptTable) {
try {
Expand Down
Loading