Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions coral-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,12 @@ dependencies {
}

compile deps.'hadoop'.'hadoop-common'

// LinkedIn Iceberg dependencies
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason why this needs to be a plugin. This should integrate with OSS Iceberg too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specifically I want to bring in the custom shaded distribution of li-iceberg-hive-metastore here. But in general, I feel ok but Coral depending on li-iceberg.

'iceberg-hive-metastore': "com.linkedin.iceberg:iceberg-hive-metastore:${versions['linkedin-iceberg']}:shaded"

compile deps.'linkedin-iceberg'.'iceberg-api'
compile deps.'linkedin-iceberg'.'iceberg-core'
compile(deps.'linkedin-iceberg'.'iceberg-hive-metastore') {
exclude group: 'org.apache.hive', module: 'hive-metastore'
exclude group: 'org.apache.hadoop', module: 'hadoop-common'
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2022 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -17,46 +17,86 @@
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.*;

import com.linkedin.coral.common.catalog.CoralCatalog;
import com.linkedin.coral.common.catalog.CoralTable;
import com.linkedin.coral.common.catalog.HiveCoralTable;
import com.linkedin.coral.common.catalog.IcebergCoralTable;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.linkedin.coral.common.catalog.TableType.VIEW;


/**
* Adaptor from Hive catalog providing database and table names
* to Calcite {@link Schema}
* Adaptor from catalog providing database and table names to Calcite {@link Schema}.
* Can use either CoralCatalog for unified access or HiveMetastoreClient for Hive-specific access.
*/
public class HiveDbSchema implements Schema {

public static final String DEFAULT_DB = "default";

private final CoralCatalog coralCatalog;
private final HiveMetastoreClient msc;
private final String dbName;

HiveDbSchema(@Nonnull HiveMetastoreClient msc, @Nonnull String dbName) {
checkNotNull(msc);
checkNotNull(dbName);
/**
* Constructor for HiveDbSchema. Exactly one of coralCatalog or msc must be non-null.
*
* @param coralCatalog Coral catalog for unified access (can be null if msc is provided)
* @param msc Hive metastore client for Hive-specific access (can be null if coralCatalog is provided)
* @param dbName Database name (must not be null)
*/
HiveDbSchema(CoralCatalog coralCatalog, HiveMetastoreClient msc, @Nonnull String dbName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we introduce the coral classes here, depending only on CoralCatalog, and mark the Hive ones as deprecated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, please take a look at the new code. I have added new parallel classes here which work with CoralCatalog

this.coralCatalog = coralCatalog;
this.msc = msc;
this.dbName = dbName;
this.dbName = checkNotNull(dbName);
}

@Override
public Table getTable(String name) {
org.apache.hadoop.hive.metastore.api.Table table = msc.getTable(dbName, name);
if (table == null) {
if (coralCatalog != null) {
// Use CoralCatalog for unified table access
CoralTable coralTable = coralCatalog.getTable(dbName, name);
if (coralTable == null) {
return null;
}

// Dispatch based on CoralTable implementation type
if (coralTable instanceof IcebergCoralTable) {
return new IcebergTable((IcebergCoralTable) coralTable);
} else if (coralTable instanceof HiveCoralTable) {
HiveCoralTable hiveCoralTable = (HiveCoralTable) coralTable;
// Check if it's a view
if (hiveCoralTable.tableType() == VIEW) {
return new HiveViewTable(hiveCoralTable, ImmutableList.of(HiveSchema.ROOT_SCHEMA, dbName));
} else {
return new HiveTable(hiveCoralTable);
}
}
return null;
}
org.apache.hadoop.hive.metastore.TableType tableType =
Enum.valueOf(org.apache.hadoop.hive.metastore.TableType.class, table.getTableType());
switch (tableType) {
case VIRTUAL_VIEW:
return new HiveViewTable(table, ImmutableList.of(HiveSchema.ROOT_SCHEMA, dbName));
default:
return new HiveTable(table);
} else {
// Use HiveMetastoreClient for Hive-specific access
org.apache.hadoop.hive.metastore.api.Table hiveTable = msc.getTable(dbName, name);
if (hiveTable == null) {
return null;
}

// Wrap in HiveCoralTable and dispatch
HiveCoralTable hiveCoralTable = new HiveCoralTable(hiveTable);
if (hiveCoralTable.tableType() == VIEW) {
return new HiveViewTable(hiveCoralTable, ImmutableList.of(HiveSchema.ROOT_SCHEMA, dbName));
} else {
return new HiveTable(hiveCoralTable);
}
}
}

@Override
public Set<String> getTableNames() {
return ImmutableSet.copyOf(msc.getAllTables(dbName));
if (coralCatalog != null) {
return ImmutableSet.copyOf(coralCatalog.getAllTables(dbName));
} else {
return ImmutableSet.copyOf(msc.getAllTables(dbName));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2022 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -11,13 +11,48 @@
import org.apache.hadoop.hive.metastore.api.Table;


/**
* Interface for accessing Hive Metastore.
* Implementations of this interface handle connections to Hive metastore
* and provide access to database and table metadata.
*
* @deprecated Use {@link com.linkedin.coral.common.catalog.CoralCatalog} instead.
* CoralCatalog provides a unified interface supporting multiple table formats
* (Hive, Iceberg, etc.) while this interface is Hive-specific.
* Existing code using HiveMetastoreClient continues to work.
*/
@Deprecated
public interface HiveMetastoreClient {

/**
* Retrieves all database names from the metastore.
*
* @return List of database names
*/
List<String> getAllDatabases();

/**
* Retrieves database metadata by name.
*
* @param dbName Database name
* @return Database object, or null if not found
*/
Database getDatabase(String dbName);

/**
* Retrieves all table names in a database.
*
* @param dbName Database name
* @return List of table names
*/
List<String> getAllTables(String dbName);

/**
* Retrieves a table by database and table name.
*
* @param dbName Database name
* @param tableName Table name
* @return Hive Table object, or null if not found
*/
Table getTable(String dbName, String tableName);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -17,6 +17,15 @@
import org.slf4j.LoggerFactory;


/**
* Adapter implementation of {@link HiveMetastoreClient} that wraps
* Hadoop's {@link IMetaStoreClient}.
*
* @deprecated Use {@link com.linkedin.coral.common.catalog.CoralCatalog} instead.
* This class is Hive-specific. For multi-format support (Hive, Iceberg),
* implement CoralCatalog directly. Existing code continues to work.
*/
@Deprecated
public class HiveMscAdapter implements HiveMetastoreClient {

private final static Logger LOG = LoggerFactory.getLogger(HiveMscAdapter.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
/**
* Copyright 2017-2022 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.common;

import java.util.Collection;
import java.util.List;
import java.util.Set;

import javax.annotation.Nonnull;
Expand All @@ -17,7 +16,8 @@
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.*;
import org.apache.hadoop.hive.metastore.api.Database;

import com.linkedin.coral.common.catalog.CoralCatalog;

import static com.google.common.base.Preconditions.checkNotNull;

Expand All @@ -26,20 +26,36 @@
* Adaptor from Hive catalog providing database and table names
* to Calcite {@link Schema}. This class represents the "root" schema
* that holds all hive databases as subschema and no tables.
*
* Can use either CoralCatalog for unified access to different table formats
* or HiveMetastoreClient for Hive-specific access.
*/
public class HiveSchema implements Schema {

public static final String ROOT_SCHEMA = "hive";
public static final String DEFAULT_DB = "default";

private final CoralCatalog coralCatalog;
private final HiveMetastoreClient msc;

/**
* Create HiveSchema using input metastore client to read hive catalog
* Create HiveSchema using CoralCatalog to read catalog information.
*
* @param coralCatalog Coral catalog providing unified access to tables
*/
public HiveSchema(@Nonnull CoralCatalog coralCatalog) {
this.coralCatalog = checkNotNull(coralCatalog);
this.msc = null;
}

/**
* Create HiveSchema using HiveMetastoreClient (backward compatibility).
*
* @param msc Hive metastore client
*/
public HiveSchema(@Nonnull HiveMetastoreClient msc) {
this.msc = checkNotNull(msc);
this.coralCatalog = null;
}

/**
Expand Down Expand Up @@ -79,14 +95,27 @@ public Set<String> getFunctionNames() {

@Override
public Schema getSubSchema(String name) {
Database database = msc.getDatabase(name);
return (database == null) ? null : new HiveDbSchema(msc, database.getName());
// Check if database exists
if (coralCatalog != null) {
if (!coralCatalog.namespaceExists(name)) {
return null;
}
return new HiveDbSchema(coralCatalog, null, name);
} else {
if (msc.getDatabase(name) == null) {
return null;
}
return new HiveDbSchema(null, msc, name);
}
}

@Override
public Set<String> getSubSchemaNames() {
List<String> dbNames = msc.getAllDatabases();
return ImmutableSet.copyOf(dbNames);
if (coralCatalog != null) {
return ImmutableSet.copyOf(coralCatalog.getAllNamespaces());
} else {
return ImmutableSet.copyOf(msc.getAllDatabases());
}
}

@Override
Expand Down
35 changes: 15 additions & 20 deletions coral-common/src/main/java/com/linkedin/coral/common/HiveTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.calcite.DataContext;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
Expand All @@ -39,6 +40,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linkedin.coral.common.catalog.HiveCoralTable;
import com.linkedin.coral.common.types.CoralDataType;
import com.linkedin.coral.common.types.CoralTypeToRelDataTypeConverter;
import com.linkedin.coral.common.types.StructField;
Expand Down Expand Up @@ -93,6 +95,15 @@ public HiveTable(org.apache.hadoop.hive.metastore.api.Table hiveTable) {
this.hiveTable = hiveTable;
}

/**
* Constructor accepting HiveCoralTable for unified catalog integration.
* @param coralTable HiveCoralTable from catalog
*/
public HiveTable(HiveCoralTable coralTable) {
Preconditions.checkNotNull(coralTable);
this.hiveTable = coralTable.getHiveTable();
}

/**
* Get dali function params from table TBLPROPERTIES clause parameters.
* The 'functions' parameter in TBLPROPERTIES clause is a whitespace-separated list of function base name
Expand Down Expand Up @@ -162,8 +173,8 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
try {
RelDataType coralType = getRowTypeFromCoralType(typeFactory);

// Compare the two type representations
if (!hiveType.equals(coralType)) {
// Compare using structural equality (not reference equality)
if (!RelOptUtil.areRowTypesEqual(hiveType, coralType, false)) {
LOG.warn("Hive and Coral type conversion mismatch for table {}.{}. Hive: {}, Coral: {}", hiveTable.getDbName(),
hiveTable.getTableName(), hiveType, coralType);
}
Expand All @@ -183,26 +194,10 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
*/
private RelDataType getRowTypeFromCoralType(RelDataTypeFactory typeFactory) {
// Stage 1: Hive → Coral
CoralDataType coralSchema = getCoralSchema();
StructType structType = (StructType) getCoralSchema();

// Stage 2: Coral → Calcite
if (!(coralSchema instanceof StructType)) {
throw new IllegalStateException("Expected StructType from getCoralSchema(), got: " + coralSchema.getClass());
}

StructType structType = (StructType) coralSchema;
List<StructField> fields = structType.getFields();

List<RelDataType> fieldTypes = new ArrayList<>(fields.size());
List<String> fieldNames = new ArrayList<>(fields.size());

for (StructField field : fields) {
fieldNames.add(field.getName());
RelDataType fieldType = CoralTypeToRelDataTypeConverter.convert(field.getType(), typeFactory);
fieldTypes.add(fieldType);
}

return typeFactory.createStructType(fieldTypes, fieldNames);
return CoralTypeToRelDataTypeConverter.convert(structType, typeFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ private static CoralDataType convertPrimitive(PrimitiveTypeInfo type) {
// Use PRECISION_NOT_SPECIFIED (-1) to match Calcite's behavior
return TimestampType.of(TimestampType.PRECISION_NOT_SPECIFIED, nullable);
case BINARY:
return PrimitiveType.of(CoralTypeKind.BINARY, nullable);
// Hive BINARY is unbounded/variable-length
return BinaryType.of(BinaryType.LENGTH_UNBOUNDED, nullable);
case DECIMAL:
DecimalTypeInfo decimalType = (DecimalTypeInfo) type;
return DecimalType.of(decimalType.precision(), decimalType.scale(), nullable);
Expand Down
Loading