Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Supporting CTAS queries for Hive to Spark query translations #324

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ public RelNode convertView(String hiveDbName, String hiveViewName) {
return toRel(sqlNode);
}

// TODO change back to protected once the relevant tests move to the common package
@VisibleForTesting
public SqlNode toSqlNode(String sql) {
return toSqlNode(sql, null);
}
Expand All @@ -161,9 +159,9 @@ public SqlNode processView(String dbName, String tableName) {
return toSqlNode(stringViewExpandedText, table);
}

@VisibleForTesting
protected RelNode toRel(SqlNode sqlNode) {
public RelNode toRel(SqlNode sqlNode) {
RelRoot root = getSqlToRelConverter().convertQuery(sqlNode, true, true);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary empty line?

return standardizeRel(root.rel);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Copyright 2022 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.common.calcite;

import org.apache.calcite.sql.SqlNode;


public interface DdlSqlValidator {

void validate(SqlNode ddlSqlNode);
}
Comment on lines +11 to +14
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is supposed to call this (external calling code or internally inside some Coral step)? How can we unify it with the existing pipeline for validation? @aastha25 please let me know your thoughts.

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Copyright 2022 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.common.calcite.sql;

import org.apache.calcite.sql.SqlNode;


/**
* Interface for SqlNodes containing select statements as a child node. Ex: CTAS queries
*/
public interface SqlCommand {
ljfgem marked this conversation as resolved.
Show resolved Hide resolved

SqlNode getSelectQuery();

void setSelectQuery(SqlNode selectQuery);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/**
* Copyright 2022 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.common.calcite.sql;

import java.util.List;

import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;

import com.linkedin.coral.javax.annotation.Nonnull;
import com.linkedin.coral.javax.annotation.Nullable;


/**
* SQL parse tree node to represent {@code CREATE} statements,
* <p>Supported Syntax:
*
* <blockquote><code>
* CREATE TABLE [ IF NOT EXISTS ] name
* [ROW FORMAT SERDE serde]
* [ROW FORMAT DELIMITED FIELDS TERMINATED BY rowFormat]
* [STORED AS fileFormat]
* [STORED AS INPUTFORMAT inputFormat STORED AS OUTPUTFORMAT outputFormat]
* [ AS query ]
*
* </code></blockquote>
*
* <p>Examples:
*
* <ul>
* <li><code>CREATE TABLE IF NOT EXISTS sample AS SELECT * FROM tmp</code></li>
* <li><code>CREATE TABLE sample STORED AS ORC AS SELECT * FROM tmp</code></li>
* <li><code>CREATE TABLE sample STORED AS INPUTFORMAT 'SerDeExampleInputFormat' OUTPUTFORMAT 'SerDeExampleOutputFormat' AS SELECT * FROM tmp</code></li>
* <li><code>CREATE TABLE sample ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' AS SELECT * FROM tmp</code></li>
* <li><code>CREATE TABLE sample ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' AS SELECT * FROM tmp</code></li>
* </ul>
*/
public class SqlCreateTable extends SqlCreate implements SqlCommand {
// name of the table to be created
private final SqlIdentifier name;
// column details like column name, data type, etc. This may be null, like in case of CTAS
private final @Nullable SqlNodeList columnList;
// select query node in case of "CREATE TABLE ... AS query"; else may be null
private @Nullable SqlNode selectQuery;
// specifying serde property
private final @Nullable SqlNode serDe;
// specifying file format such as Parquet, ORC, etc.
private final @Nullable SqlNodeList fileFormat;
// specifying delimiter fields for row format
private final @Nullable SqlCharStringLiteral rowFormat;

private static final SqlOperator OPERATOR = new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE);

/** Creates a SqlCreateTable. */
public SqlCreateTable(SqlParserPos pos, boolean replace, boolean ifNotExists, @Nonnull SqlIdentifier name,
@Nullable SqlNodeList columnList, @Nullable SqlNode selectQuery, @Nullable SqlNode serDe,
@Nullable SqlNodeList fileFormat, @Nullable SqlCharStringLiteral rowFormat) {
super(OPERATOR, pos, replace, ifNotExists);
this.name = name;
this.columnList = columnList;
this.selectQuery = selectQuery;
this.serDe = serDe;
this.fileFormat = fileFormat;
this.rowFormat = rowFormat;
}

@SuppressWarnings("nullness")
@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(name, columnList, selectQuery, serDe, fileFormat, rowFormat);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("CREATE");
writer.keyword("TABLE");
if (ifNotExists) {
writer.keyword("IF NOT EXISTS");
}
name.unparse(writer, leftPrec, rightPrec);
if (columnList != null) {
SqlWriter.Frame frame = writer.startList("(", ")");
for (SqlNode c : columnList) {
writer.sep(",");
c.unparse(writer, 0, 0);
}
writer.endList(frame);
}
if (serDe != null) {
writer.keyword("ROW FORMAT SERDE");
serDe.unparse(writer, 0, 0);
writer.newlineAndIndent();
}
if (rowFormat != null) {
writer.keyword("ROW FORMAT DELIMITED FIELDS TERMINATED BY");
rowFormat.unparse(writer, 0, 0);
writer.newlineAndIndent();
}
if (fileFormat != null) {
if (fileFormat.size() == 1) {
writer.keyword("STORED AS");
fileFormat.get(0).unparse(writer, 0, 0);
} else {
writer.keyword("STORED AS INPUTFORMAT");
fileFormat.get(0).unparse(writer, 0, 0);
writer.keyword("OUTPUTFORMAT");
fileFormat.get(1).unparse(writer, 0, 0);
}
writer.newlineAndIndent();
}
if (selectQuery != null) {
writer.keyword("AS");
writer.newlineAndIndent();
selectQuery.unparse(writer, 0, 0);
}
}

@Override
public SqlNode getSelectQuery() {
return selectQuery;
}

@Override
public void setSelectQuery(SqlNode query) {
this.selectQuery = query;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright 2022 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.common.calcite.sql.util;

import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos;

import com.linkedin.coral.common.calcite.sql.SqlCreateTable;


public class SqlDdlNodes {

/** Creates a CREATE TABLE. */
public static SqlCreateTable createTable(SqlParserPos pos, boolean replace, boolean ifNotExists, SqlIdentifier name,
SqlNodeList columnList, SqlNode query, SqlNode tableSerializer, SqlNodeList tableFileFormat,
SqlCharStringLiteral tableRowFormat) {
return new SqlCreateTable(pos, replace, ifNotExists, name, columnList, query, tableSerializer, tableFileFormat,
tableRowFormat);
}
Comment on lines +17 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds redundant? @aastha25

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

import com.linkedin.coral.common.HiveMetastoreClient;
import com.linkedin.coral.common.ToRelConverter;
import com.linkedin.coral.common.calcite.DdlSqlValidator;
import com.linkedin.coral.hive.hive2rel.functions.HiveFunctionResolver;
import com.linkedin.coral.hive.hive2rel.functions.StaticHiveFunctionRegistry;
import com.linkedin.coral.hive.hive2rel.parsetree.ParseTreeBuilder;
import com.linkedin.coral.hive.hive2rel.validators.HiveDdlSqlValidator;

import static com.linkedin.coral.hive.hive2rel.HiveSqlConformance.HIVE_SQL;

Expand All @@ -52,6 +54,7 @@ public class HiveToRelConverter extends ToRelConverter {
// The validator must be reused
SqlValidator sqlValidator = new HiveSqlValidator(getOperatorTable(), getCalciteCatalogReader(),
((JavaTypeFactory) getRelBuilder().getTypeFactory()), HIVE_SQL);
DdlSqlValidator ddlSqlValidator = new HiveDdlSqlValidator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also make this private final?


public HiveToRelConverter(HiveMetastoreClient hiveMetastoreClient) {
super(hiveMetastoreClient);
Expand Down Expand Up @@ -92,7 +95,9 @@ protected SqlToRelConverter getSqlToRelConverter() {

@Override
protected SqlNode toSqlNode(String sql, Table hiveView) {
return parseTreeBuilder.process(trimParenthesis(sql), hiveView);
SqlNode sqlNode = parseTreeBuilder.process(trimParenthesis(sql), hiveView);
ddlSqlValidator.validate(sqlNode);
return sqlNode;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,114 @@ protected R visit(ASTNode node, C ctx) {
case HiveParser.KW_CURRENT:
return visitCurrentRow(node, ctx);

case HiveParser.TOK_CREATETABLE:
return visitCreateTable(node, ctx);
case HiveParser.TOK_LIKETABLE:
return visitLikeTable(node, ctx);
case HiveParser.TOK_IFNOTEXISTS:
return visitIfNotExists(node, ctx);
case HiveParser.TOK_TABCOLLIST:
return visitColumnList(node, ctx);
case HiveParser.TOK_TABCOL:
return visitColumn(node, ctx);
case HiveParser.TOK_FILEFORMAT_GENERIC:
return visitFileFormatGeneric(node, ctx);
case HiveParser.TOK_TABLEFILEFORMAT:
return visitTableFileFormat(node, ctx);
case HiveParser.TOK_TABLESERIALIZER:
return visitTableSerializer(node, ctx);
case HiveParser.TOK_SERDENAME:
return visitSerdeName(node, ctx);
case HiveParser.TOK_TABLEROWFORMAT:
return visitTableRowFormat(node, ctx);
case HiveParser.TOK_SERDEPROPS:
return visitSerdeProps(node, ctx);
case HiveParser.TOK_TABLEROWFORMATFIELD:
return visitTableRowFormatField(node, ctx);
default:
// return visitChildren(node, ctx);
throw new UnhandledASTTokenException(node);
}
}

protected R visitTableRowFormatField(ASTNode node, C ctx) {
if (node.getChildren() != null) {
return visitChildren(node, ctx).get(0);
}
return null;
}

protected R visitSerdeProps(ASTNode node, C ctx) {
if (node.getChildren() != null) {
return visitChildren(node, ctx).get(0);
}
return null;
}

protected R visitTableRowFormat(ASTNode node, C ctx) {
if (node.getChildren() != null) {
return visitChildren(node, ctx).get(0);
}
return null;
}

protected R visitSerdeName(ASTNode node, C ctx) {
if (node.getChildren() != null) {
return visitChildren(node, ctx).get(0);
}
return null;
}

protected R visitTableSerializer(ASTNode node, C ctx) {
if (node.getChildren() != null) {
return visitChildren(node, ctx).get(0);
}
return null;
}

protected R visitTableFileFormat(ASTNode node, C ctx) {
if (node.getChildren() != null) {
return visitChildren(node, ctx).get(0);
}
return null;
}

protected R visitFileFormatGeneric(ASTNode node, C ctx) {
if (node.getChildren() != null) {
return visitChildren(node, ctx).get(0);
}
return null;
}

protected R visitColumn(ASTNode node, C ctx) {
if (node.getChildren() != null) {
return visitChildren(node, ctx).get(0);
}
return null;
}

protected R visitColumnList(ASTNode node, C ctx) {
return visitChildren(node, ctx).get(0);
}

protected R visitIfNotExists(ASTNode node, C ctx) {
if (node.getChildren() != null) {
return visitChildren(node, ctx).get(0);
}
return null;
}

protected R visitLikeTable(ASTNode node, C ctx) {
if (node.getChildren() != null) {
return visitChildren(node, ctx).get(0);
}
return null;
}

protected R visitCreateTable(ASTNode node, C ctx) {
return visitChildren(node, ctx).get(0);
}

protected R visitKeywordLiteral(ASTNode node, C ctx) {
return visitChildren(node, ctx).get(0);
}
Expand Down
Loading