-
Notifications
You must be signed in to change notification settings - Fork 200
Integrate Coral Type System for Hive Tables #563
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
95b86ff
5ea6754
0c4b97c
5ac359f
080722c
eb470f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| /** | ||
| * Copyright 2017-2023 LinkedIn Corporation. All rights reserved. | ||
| * Copyright 2017-2025 LinkedIn Corporation. All rights reserved. | ||
| * Licensed under the BSD-2 Clause license. | ||
| * See LICENSE in the project root for license information. | ||
| */ | ||
|
|
@@ -39,6 +39,11 @@ | |
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import com.linkedin.coral.common.types.CoralDataType; | ||
| import com.linkedin.coral.common.types.CoralTypeToRelDataTypeConverter; | ||
| import com.linkedin.coral.common.types.StructField; | ||
| import com.linkedin.coral.common.types.StructType; | ||
|
|
||
|
|
||
| /** | ||
| * Adaptor class from Hive {@link org.apache.hadoop.hive.metastore.api.Table} representation to | ||
|
|
@@ -134,12 +139,70 @@ private void checkDaliTable() { | |
| // Preconditions.checkState(isDaliTable()); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the row type (schema) for this table. | ||
| * | ||
| * Two conversion paths are supported: | ||
| * 1. Two-stage (preferred): Hive → Coral → Calcite | ||
| * 2. Direct (legacy): Hive → Calcite (for backward compatibility) | ||
| * | ||
| * The two-stage conversion enables using Coral type system as an intermediary, | ||
| * allowing better type system unification and testing. | ||
| * | ||
| * @param typeFactory Calcite type factory | ||
| * @return RelDataType representing the table schema | ||
| */ | ||
| @Override | ||
| public RelDataType getRowType(RelDataTypeFactory typeFactory) { | ||
| // Use two-stage conversion if HiveCoralTable is available | ||
| try { | ||
| return getRowTypeViaCoralTypeSystem(typeFactory); | ||
| } catch (Exception e) { | ||
| // Fall back to direct conversion if two-stage conversion fails | ||
| LOG.warn("Two-stage type conversion failed for table {}, falling back to direct conversion. Error: {}", | ||
| hiveTable.getTableName(), e.getMessage(), e); | ||
| return getRowTypeDirectConversion(typeFactory); | ||
aastha25 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Two-stage conversion: Hive → Coral → Calcite. | ||
| * This is the preferred path when using CoralCatalog. | ||
| */ | ||
| private RelDataType getRowTypeViaCoralTypeSystem(RelDataTypeFactory typeFactory) { | ||
|
||
| // Stage 1: Hive → Coral | ||
| CoralDataType coralSchema = getCoralSchema(); | ||
|
|
||
| // Stage 2: Coral → Calcite | ||
| if (!(coralSchema instanceof StructType)) { | ||
| throw new IllegalStateException("Expected StructType from getCoralSchema(), got: " + coralSchema.getClass()); | ||
| } | ||
|
|
||
| StructType structType = (StructType) coralSchema; | ||
| List<StructField> fields = structType.getFields(); | ||
|
|
||
| List<RelDataType> fieldTypes = new ArrayList<>(fields.size()); | ||
| List<String> fieldNames = new ArrayList<>(fields.size()); | ||
|
|
||
| for (StructField field : fields) { | ||
| fieldNames.add(field.getName()); | ||
| RelDataType fieldType = CoralTypeToRelDataTypeConverter.convert(field.getType(), typeFactory); | ||
| fieldTypes.add(fieldType); | ||
| } | ||
|
|
||
| return typeFactory.createStructType(fieldTypes, fieldNames); | ||
| } | ||
|
|
||
| /** | ||
| * Direct conversion: Hive → Calcite. | ||
| * This is the legacy path for backward compatibility. | ||
| */ | ||
| private RelDataType getRowTypeDirectConversion(RelDataTypeFactory typeFactory) { | ||
| final List<FieldSchema> cols = getColumns(); | ||
| final List<RelDataType> fieldTypes = new ArrayList<>(cols.size()); | ||
| final List<String> fieldNames = new ArrayList<>(cols.size()); | ||
| final Iterable<FieldSchema> allCols = Iterables.concat(cols, hiveTable.getPartitionKeys()); | ||
|
|
||
| allCols.forEach(col -> { | ||
| final TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(col.getType()); | ||
| final RelDataType relType = TypeConverter.convert(typeInfo, typeFactory); | ||
|
|
@@ -153,6 +216,40 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) { | |
| return typeFactory.createStructType(fieldTypes, fieldNames); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the table schema in Coral type system. | ||
| * This includes both regular columns (from StorageDescriptor) and partition columns. | ||
| * Converts Hive TypeInfo to Coral types using HiveToCoralTypeConverter. | ||
| * | ||
| * @return StructType representing the full table schema (columns + partitions) | ||
| */ | ||
| public CoralDataType getCoralSchema() { | ||
| final List<FieldSchema> cols = getColumns(); | ||
| final List<StructField> fields = new ArrayList<>(); | ||
| final List<String> fieldNames = new ArrayList<>(); | ||
|
|
||
| // Combine regular columns and partition keys (same as HiveTable.getRowType) | ||
| final Iterable<FieldSchema> allCols = Iterables.concat(cols, hiveTable.getPartitionKeys()); | ||
|
|
||
| for (FieldSchema col : allCols) { | ||
| final String colName = col.getName(); | ||
|
|
||
| // Skip duplicate columns (partition keys might overlap with regular columns) | ||
| if (!fieldNames.contains(colName)) { | ||
| // Convert Hive type string to TypeInfo, then to CoralDataType | ||
| final TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(col.getType()); | ||
| final CoralDataType coralType = HiveToCoralTypeConverter.convert(typeInfo); | ||
|
|
||
| fields.add(StructField.of(colName, coralType)); | ||
| fieldNames.add(colName); | ||
| } | ||
| } | ||
|
|
||
| // Return struct type representing the table schema | ||
| // Table-level struct is nullable (Hive convention) | ||
| return StructType.of(fields, true); | ||
| } | ||
|
|
||
| private List<FieldSchema> getColumns() { | ||
| StorageDescriptor sd = hiveTable.getSd(); | ||
| String serDeLib = getSerializationLib(); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.