Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coral-Spark: Migrate some operator transformers from RelNode layer to SqlNode layer #351

Merged
merged 5 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.linkedin.coral.transformers;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -25,6 +26,7 @@
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.sql.JoinConditionType;
import org.apache.calcite.sql.JoinType;
import org.apache.calcite.sql.SqlCall;
Expand All @@ -43,6 +45,7 @@
import com.linkedin.coral.com.google.common.collect.ImmutableList;
import com.linkedin.coral.com.google.common.collect.ImmutableMap;
import com.linkedin.coral.common.functions.CoralSqlUnnestOperator;
import com.linkedin.coral.common.functions.FunctionFieldReferenceOperator;


/**
Expand Down Expand Up @@ -345,4 +348,48 @@ private SqlNode generateRightChildForSqlJoinWithLateralViews(BiRel e, Result rig

return SqlStdOperatorTable.AS.createCall(POS, asOperands);
}

/**
* Override this method to handle the conversion for {@link RexFieldAccess} `f(x).y` where `f` is an operator,
* which returns a struct containing field `y`.
*
* Calcite converts it to a {@link SqlIdentifier} with {@link SqlIdentifier#names} as ["f(x)", "y"] where "f(x)" and "y" are String,
* which is opaque and not aligned with our expectation, since we want to apply transformations on `f(x)` with
* {@link com.linkedin.coral.common.transformers.SqlCallTransformer}. Therefore, we override this
* method to convert `f(x)` to {@link SqlCall} and `.` to {@link com.linkedin.coral.common.functions.FunctionFieldReferenceOperator#DOT}.
*
* With this override, the converted CoralSqlNode matches the previous SqlNode handed over to Calcite for validation and conversion
* in `HiveSqlToRelConverter#convertQuery`.
*
* Check `CoralSparkTest#testConvertFieldAccessOnFunctionCall` for a more complex example with nested field access.
*/
@Override
public Context aliasContext(Map<String, RelDataType> aliases, boolean qualified) {
ljfgem marked this conversation as resolved.
Show resolved Hide resolved
return new AliasContext(INSTANCE, aliases, qualified) {
@Override
public SqlNode toSql(RexProgram program, RexNode rex) {
if (rex.getKind() == SqlKind.FIELD_ACCESS) {
final List<String> accessNames = new ArrayList<>();
RexNode referencedExpr = rex;
// Use the loop to get the top-level struct (`f(x)` in the example above),
// and store the accessed field names ([`y`] in the example above)
while (referencedExpr.getKind() == SqlKind.FIELD_ACCESS) {
accessNames.add(((RexFieldAccess) referencedExpr).getField().getName());
referencedExpr = ((RexFieldAccess) referencedExpr).getReferenceExpr();
}
final SqlKind sqlKind = referencedExpr.getKind();
if (sqlKind == SqlKind.OTHER_FUNCTION || sqlKind == SqlKind.CAST || sqlKind == SqlKind.ROW) {
SqlNode functionCall = toSql(program, referencedExpr);
ljfgem marked this conversation as resolved.
Show resolved Hide resolved
Collections.reverse(accessNames);
for (String accessName : accessNames) {
functionCall = FunctionFieldReferenceOperator.DOT.createCall(SqlParserPos.ZERO, functionCall,
ljfgem marked this conversation as resolved.
Show resolved Hide resolved
new SqlIdentifier(accessName, POS));
}
return functionCall;
}
}
return super.toSql(program, rex);
}
};
}
}

This file was deleted.

26 changes: 16 additions & 10 deletions coral-spark/src/main/java/com/linkedin/coral/spark/CoralSpark.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.linkedin.coral.spark;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.avro.Schema;
Expand All @@ -16,6 +17,7 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlSelect;

import com.linkedin.coral.com.google.common.collect.ImmutableList;
import com.linkedin.coral.spark.containers.SparkRelInfo;
import com.linkedin.coral.spark.containers.SparkUDFInfo;
import com.linkedin.coral.spark.dialect.SparkSqlDialect;
Expand Down Expand Up @@ -63,11 +65,11 @@ private CoralSpark(List<String> baseTables, List<SparkUDFInfo> sparkUDFInfoList,
*/
public static CoralSpark create(RelNode irRelNode) {
SparkRelInfo sparkRelInfo = IRRelToSparkRelTransformer.transform(irRelNode);
Set<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfos();
RelNode sparkRelNode = sparkRelInfo.getSparkRelNode();
String sparkSQL = constructSparkSQL(sparkRelNode);
String sparkSQL = constructSparkSQL(sparkRelNode, sparkUDFInfos);
List<String> baseTables = constructBaseTables(sparkRelNode);
List<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfoList();
return new CoralSpark(baseTables, sparkUDFInfos, sparkSQL);
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL);
}

/**
Expand All @@ -86,11 +88,11 @@ public static CoralSpark create(RelNode irRelNode, Schema schema) {

private static CoralSpark createWithAlias(RelNode irRelNode, List<String> aliases) {
SparkRelInfo sparkRelInfo = IRRelToSparkRelTransformer.transform(irRelNode);
Set<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfos();
RelNode sparkRelNode = sparkRelInfo.getSparkRelNode();
String sparkSQL = constructSparkSQLWithExplicitAlias(sparkRelNode, aliases);
String sparkSQL = constructSparkSQLWithExplicitAlias(sparkRelNode, aliases, sparkUDFInfos);
List<String> baseTables = constructBaseTables(sparkRelNode);
List<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfoList();
return new CoralSpark(baseTables, sparkUDFInfos, sparkSQL);
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL);
}

/**
Expand All @@ -105,21 +107,25 @@ private static CoralSpark createWithAlias(RelNode irRelNode, List<String> aliase
*
* @param sparkRelNode A Spark compatible RelNode
*
* @param sparkUDFInfos A set of Spark UDF information objects
* @return SQL String in Spark SQL dialect which is 'completely expanded'
*/
private static String constructSparkSQL(RelNode sparkRelNode) {
private static String constructSparkSQL(RelNode sparkRelNode, Set<SparkUDFInfo> sparkUDFInfos) {
CoralRelToSqlNodeConverter rel2sql = new CoralRelToSqlNodeConverter();
SqlNode coralSqlNode = rel2sql.convert(sparkRelNode);
SqlNode sparkSqlNode = coralSqlNode.accept(new CoralSqlNodeToSparkSqlNodeConverter());
SqlNode sparkSqlNode = coralSqlNode.accept(new CoralSqlNodeToSparkSqlNodeConverter())
.accept(new CoralToSparkSqlCallConverter(sparkUDFInfos));
SqlNode rewrittenSparkSqlNode = sparkSqlNode.accept(new SparkSqlRewriter());
return rewrittenSparkSqlNode.toSqlString(SparkSqlDialect.INSTANCE).getSql();
}

private static String constructSparkSQLWithExplicitAlias(RelNode sparkRelNode, List<String> aliases) {
private static String constructSparkSQLWithExplicitAlias(RelNode sparkRelNode, List<String> aliases,
Set<SparkUDFInfo> sparkUDFInfos) {
CoralRelToSqlNodeConverter rel2sql = new CoralRelToSqlNodeConverter();
// Create temporary objects r and rewritten to make debugging easier
SqlNode coralSqlNode = rel2sql.convert(sparkRelNode);
SqlNode sparkSqlNode = coralSqlNode.accept(new CoralSqlNodeToSparkSqlNodeConverter());
SqlNode sparkSqlNode = coralSqlNode.accept(new CoralSqlNodeToSparkSqlNodeConverter())
.accept(new CoralToSparkSqlCallConverter(sparkUDFInfos));

SqlNode rewritten = sparkSqlNode.accept(new SparkSqlRewriter());
// Use a second pass visit to add explicit alias names,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/**
* Copyright 2023 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.spark;

import java.util.Set;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.util.SqlShuttle;

import com.linkedin.coral.common.transformers.OperatorRenameSqlCallTransformer;
import com.linkedin.coral.common.transformers.SqlCallTransformers;
import com.linkedin.coral.spark.containers.SparkUDFInfo;
import com.linkedin.coral.spark.transformers.FallBackToLinkedInHiveUDFTransformer;
import com.linkedin.coral.spark.transformers.TransportUDFTransformer;

import static com.linkedin.coral.spark.transformers.TransportUDFTransformer.*;


/**
* This class extends the class of {@link org.apache.calcite.sql.util.SqlShuttle} and initialize a {@link com.linkedin.coral.common.transformers.SqlCallTransformers}
* which containing a list of {@link com.linkedin.coral.common.transformers.SqlCallTransformer} to traverse the hierarchy of a {@link org.apache.calcite.sql.SqlCall}
* and converts the functions from Coral operator to Spark operator if it is required
*
* In this converter, we need to apply {@link TransportUDFTransformer} before {@link FallBackToLinkedInHiveUDFTransformer}
* because we should try to transform a UDF to an equivalent Transport UDF before falling back to LinkedIn Hive UDF.
*/
public class CoralToSparkSqlCallConverter extends SqlShuttle {
ljfgem marked this conversation as resolved.
Show resolved Hide resolved
private final SqlCallTransformers sqlCallTransformers;

public CoralToSparkSqlCallConverter(Set<SparkUDFInfo> sparkUDFInfos) {
this.sqlCallTransformers = SqlCallTransformers.of(
// Transport UDFs
new TransportUDFTransformer("com.linkedin.dali.udf.date.hive.DateFormatToEpoch",
"com.linkedin.stdudfs.daliudfs.spark.DateFormatToEpoch", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.dali.udf.date.hive.EpochToDateFormat",
"com.linkedin.stdudfs.daliudfs.spark.EpochToDateFormat", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.dali.udf.date.hive.EpochToEpochMilliseconds",
"com.linkedin.stdudfs.daliudfs.spark.EpochToEpochMilliseconds", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.dali.udf.isguestmemberid.hive.IsGuestMemberId",
"com.linkedin.stdudfs.daliudfs.spark.IsGuestMemberId", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.dali.udf.istestmemberid.hive.IsTestMemberId",
"com.linkedin.stdudfs.daliudfs.spark.IsTestMemberId", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.dali.udf.maplookup.hive.MapLookup",
"com.linkedin.stdudfs.daliudfs.spark.MapLookup", DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12,
sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.dali.udf.sanitize.hive.Sanitize",
"com.linkedin.stdudfs.daliudfs.spark.Sanitize", DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12,
sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.dali.udf.watbotcrawlerlookup.hive.WATBotCrawlerLookup",
"com.linkedin.stdudfs.daliudfs.spark.WatBotCrawlerLookup", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.DateFormatToEpoch",
"com.linkedin.stdudfs.daliudfs.spark.DateFormatToEpoch", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.EpochToDateFormat",
"com.linkedin.stdudfs.daliudfs.spark.EpochToDateFormat", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.EpochToEpochMilliseconds",
"com.linkedin.stdudfs.daliudfs.spark.EpochToEpochMilliseconds", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.GetProfileSections",
"com.linkedin.stdudfs.daliudfs.spark.GetProfileSections", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.stringudfs.hive.InitCap",
"com.linkedin.stdudfs.stringudfs.spark.InitCap",
"ivy://com.linkedin.standard-udfs-common-sql-udfs:standard-udfs-string-udfs:1.0.1?classifier=spark_2.11",
"ivy://com.linkedin.standard-udfs-common-sql-udfs:standard-udfs-string-udfs:1.0.1?classifier=spark_2.12",
sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.IsGuestMemberId",
"com.linkedin.stdudfs.daliudfs.spark.IsGuestMemberId", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.IsTestMemberId",
"com.linkedin.stdudfs.daliudfs.spark.IsTestMemberId", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.MapLookup",
"com.linkedin.stdudfs.daliudfs.spark.MapLookup", DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12,
sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.PortalLookup",
"com.linkedin.stdudfs.daliudfs.spark.PortalLookup", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.Sanitize",
"com.linkedin.stdudfs.daliudfs.spark.Sanitize", DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12,
sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.userinterfacelookup.hive.UserInterfaceLookup",
"com.linkedin.stdudfs.userinterfacelookup.spark.UserInterfaceLookup",
"ivy://com.linkedin.standard-udf-userinterfacelookup:userinterfacelookup-std-udf:0.0.27?classifier=spark_2.11",
"ivy://com.linkedin.standard-udf-userinterfacelookup:userinterfacelookup-std-udf:0.0.27?classifier=spark_2.12",
sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.WatBotCrawlerLookup",
"com.linkedin.stdudfs.daliudfs.spark.WatBotCrawlerLookup", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.jemslookup.udf.hive.JemsLookup",
"com.linkedin.jemslookup.udf.spark.JemsLookup",
"ivy://com.linkedin.jobs-udf:jems-udfs:2.1.7?classifier=spark_2.11",
"ivy://com.linkedin.jobs-udf:jems-udfs:2.1.7?classifier=spark_2.12", sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.parsing.hive.UserAgentParser",
"com.linkedin.stdudfs.parsing.spark.UserAgentParser",
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.11",
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.12", sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.parsing.hive.Ip2Str",
"com.linkedin.stdudfs.parsing.spark.Ip2Str",
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.11",
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.12", sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.stdudfs.lookup.hive.BrowserLookup",
"com.linkedin.stdudfs.lookup.spark.BrowserLookup",
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.11",
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.12", sparkUDFInfos),

new TransportUDFTransformer("com.linkedin.jobs.udf.hive.ConvertIndustryCode",
"com.linkedin.jobs.udf.spark.ConvertIndustryCode",
"ivy://com.linkedin.jobs-udf:jobs-udfs:2.1.6?classifier=spark_2.11",
"ivy://com.linkedin.jobs-udf:jobs-udfs:2.1.6?classifier=spark_2.12", sparkUDFInfos),

// Transport UDF for unit test
new TransportUDFTransformer("com.linkedin.coral.hive.hive2rel.CoralTestUDF",
"com.linkedin.coral.spark.CoralTestUDF",
"ivy://com.linkedin.coral.spark.CoralTestUDF?classifier=spark_2.11", null, sparkUDFInfos),

// Built-in operator
new OperatorRenameSqlCallTransformer(SqlStdOperatorTable.CARDINALITY, 1, "size"),

// Fall back to the original Hive UDF defined in StaticHiveFunctionRegistry after failing to apply transformers above
new FallBackToLinkedInHiveUDFTransformer(sparkUDFInfos));
}

@Override
public SqlNode visit(SqlCall call) {
final SqlCall transformedSqlCall = sqlCallTransformers.apply(call);
return super.visit(transformedSqlCall);
}
}
Loading