1616import com .google .common .base .Splitter ;
1717import com .google .common .collect .ImmutableList ;
1818import com .google .common .collect .Iterables ;
19-
19+ import com .linkedin .coral .common .types .CoralDataType ;
20+ import com .linkedin .coral .common .types .CoralTypeToRelDataTypeConverter ;
21+ import com .linkedin .coral .common .types .StructField ;
22+ import com .linkedin .coral .common .types .StructType ;
2023import org .apache .calcite .DataContext ;
2124import org .apache .calcite .config .CalciteConnectionConfig ;
2225import org .apache .calcite .linq4j .Enumerable ;
@@ -134,12 +137,70 @@ private void checkDaliTable() {
134137 // Preconditions.checkState(isDaliTable());
135138 }
136139
140+ /**
141+ * Returns the row type (schema) for this table.
142+ *
143+ * Two conversion paths are supported:
144+ * 1. Two-stage (preferred): Hive → Coral → Calcite
145+ * 2. Direct (legacy): Hive → Calcite (for backward compatibility)
146+ *
147+ * The two-stage conversion enables using Coral type system as an intermediary,
148+ * allowing better type system unification and testing.
149+ *
150+ * @param typeFactory Calcite type factory
151+ * @return RelDataType representing the table schema
152+ */
137153 @ Override
138154 public RelDataType getRowType (RelDataTypeFactory typeFactory ) {
155+ // Use two-stage conversion if HiveCoralTable is available
156+ try {
157+ return getRowTypeViaCoralTypeSystem (typeFactory );
158+ } catch (Exception e ) {
159+ // Fall back to direct conversion if two-stage conversion fails
160+ LOG .warn ("Two-stage type conversion failed for table {}, falling back to direct conversion. Error: {}" ,
161+ hiveTable .getTableName (), e .getMessage (), e );
162+ return getRowTypeDirectConversion (typeFactory );
163+ }
164+ }
165+
166+ /**
167+ * Two-stage conversion: Hive → Coral → Calcite.
168+ * This is the preferred path when using CoralCatalog.
169+ */
170+ private RelDataType getRowTypeViaCoralTypeSystem (RelDataTypeFactory typeFactory ) {
171+ // Stage 1: Hive → Coral
172+ CoralDataType coralSchema = getCoralSchema ();
173+
174+ // Stage 2: Coral → Calcite
175+ if (!(coralSchema instanceof StructType )) {
176+ throw new IllegalStateException ("Expected StructType from getCoralSchema(), got: " + coralSchema .getClass ());
177+ }
178+
179+ StructType structType = (StructType ) coralSchema ;
180+ List <StructField > fields = structType .getFields ();
181+
182+ List <RelDataType > fieldTypes = new ArrayList <>(fields .size ());
183+ List <String > fieldNames = new ArrayList <>(fields .size ());
184+
185+ for (StructField field : fields ) {
186+ fieldNames .add (field .getName ());
187+ RelDataType fieldType = CoralTypeToRelDataTypeConverter .convert (field .getType (), typeFactory );
188+ fieldTypes .add (fieldType );
189+ }
190+
191+ return typeFactory .createStructType (fieldTypes , fieldNames );
192+ }
193+
194+ /**
195+ * Direct conversion: Hive → Calcite.
196+ * This is the legacy path for backward compatibility.
197+ */
198+ private RelDataType getRowTypeDirectConversion (RelDataTypeFactory typeFactory ) {
139199 final List <FieldSchema > cols = getColumns ();
140200 final List <RelDataType > fieldTypes = new ArrayList <>(cols .size ());
141201 final List <String > fieldNames = new ArrayList <>(cols .size ());
142202 final Iterable <FieldSchema > allCols = Iterables .concat (cols , hiveTable .getPartitionKeys ());
203+
143204 allCols .forEach (col -> {
144205 final TypeInfo typeInfo = TypeInfoUtils .getTypeInfoFromTypeString (col .getType ());
145206 final RelDataType relType = TypeConverter .convert (typeInfo , typeFactory );
@@ -153,6 +214,41 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
153214 return typeFactory .createStructType (fieldTypes , fieldNames );
154215 }
155216
217+ /**
218+ * Returns the table schema in Coral type system.
219+ * This includes both regular columns (from StorageDescriptor) and partition columns.
220+ * Converts Hive TypeInfo to Coral types using HiveToCoralTypeConverter.
221+ *
222+ * @return StructType representing the full table schema (columns + partitions)
223+ */
224+ @ Override
225+ public CoralDataType getCoralSchema () {
226+ final List <FieldSchema > cols = getColumns ();
227+ final List <StructField > fields = new ArrayList <>();
228+ final List <String > fieldNames = new ArrayList <>();
229+
230+ // Combine regular columns and partition keys (same as HiveTable.getRowType)
231+ final Iterable <FieldSchema > allCols = Iterables .concat (cols , hiveTable .getPartitionKeys ());
232+
233+ for (FieldSchema col : allCols ) {
234+ final String colName = col .getName ();
235+
236+ // Skip duplicate columns (partition keys might overlap with regular columns)
237+ if (!fieldNames .contains (colName )) {
238+ // Convert Hive type string to TypeInfo, then to CoralDataType
239+ final TypeInfo typeInfo = TypeInfoUtils .getTypeInfoFromTypeString (col .getType ());
240+ final CoralDataType coralType = HiveToCoralTypeConverter .convert (typeInfo );
241+
242+ fields .add (StructField .of (colName , coralType ));
243+ fieldNames .add (colName );
244+ }
245+ }
246+
247+ // Return struct type representing the table schema
248+ // Table-level struct is nullable (Hive convention)
249+ return StructType .of (fields , true );
250+ }
251+
156252 private List <FieldSchema > getColumns () {
157253 StorageDescriptor sd = hiveTable .getSd ();
158254 String serDeLib = getSerializationLib ();
0 commit comments