Skip to content

Commit cdbdd4e

Browse files
authored
Coral-Spark: Migrate some operator transformers from RelNode layer to SqlNode layer (#351)
1 parent e4d4db9 commit cdbdd4e

File tree

16 files changed

+549
-525
lines changed

16 files changed

+549
-525
lines changed

coral-hive/src/main/java/com/linkedin/coral/transformers/CoralRelToSqlNodeConverter.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package com.linkedin.coral.transformers;
77

88
import java.util.ArrayList;
9+
import java.util.Collections;
910
import java.util.List;
1011
import java.util.Map;
1112

@@ -25,6 +26,7 @@
2526
import org.apache.calcite.rex.RexCorrelVariable;
2627
import org.apache.calcite.rex.RexFieldAccess;
2728
import org.apache.calcite.rex.RexNode;
29+
import org.apache.calcite.rex.RexProgram;
2830
import org.apache.calcite.sql.JoinConditionType;
2931
import org.apache.calcite.sql.JoinType;
3032
import org.apache.calcite.sql.SqlCall;
@@ -43,6 +45,7 @@
4345
import com.linkedin.coral.com.google.common.collect.ImmutableList;
4446
import com.linkedin.coral.com.google.common.collect.ImmutableMap;
4547
import com.linkedin.coral.common.functions.CoralSqlUnnestOperator;
48+
import com.linkedin.coral.common.functions.FunctionFieldReferenceOperator;
4649

4750

4851
/**
@@ -345,4 +348,48 @@ private SqlNode generateRightChildForSqlJoinWithLateralViews(BiRel e, Result rig
345348

346349
return SqlStdOperatorTable.AS.createCall(POS, asOperands);
347350
}
351+
352+
/**
353+
* Override this method to handle the conversion for {@link RexFieldAccess} `f(x).y` where `f` is an operator,
354+
* which returns a struct containing field `y`.
355+
*
356+
* Calcite converts it to a {@link SqlIdentifier} with {@link SqlIdentifier#names} as ["f(x)", "y"] where "f(x)" and "y" are String,
357+
* which is opaque and not aligned with our expectation, since we want to apply transformations on `f(x)` with
358+
* {@link com.linkedin.coral.common.transformers.SqlCallTransformer}. Therefore, we override this
359+
* method to convert `f(x)` to {@link SqlCall} and `.` to {@link com.linkedin.coral.common.functions.FunctionFieldReferenceOperator#DOT}.
360+
*
361+
* With this override, the converted CoralSqlNode matches the previous SqlNode handed over to Calcite for validation and conversion
362+
* in `HiveSqlToRelConverter#convertQuery`.
363+
*
364+
* Check `CoralSparkTest#testConvertFieldAccessOnFunctionCall` for a more complex example with nested field access.
365+
*/
366+
@Override
367+
public Context aliasContext(Map<String, RelDataType> aliases, boolean qualified) {
368+
return new AliasContext(INSTANCE, aliases, qualified) {
369+
@Override
370+
public SqlNode toSql(RexProgram program, RexNode rex) {
371+
if (rex.getKind() == SqlKind.FIELD_ACCESS) {
372+
final List<String> accessNames = new ArrayList<>();
373+
RexNode referencedExpr = rex;
374+
// Use the loop to get the top-level struct (`f(x)` in the example above),
375+
// and store the accessed field names ([`y`] in the example above)
376+
while (referencedExpr.getKind() == SqlKind.FIELD_ACCESS) {
377+
accessNames.add(((RexFieldAccess) referencedExpr).getField().getName());
378+
referencedExpr = ((RexFieldAccess) referencedExpr).getReferenceExpr();
379+
}
380+
final SqlKind sqlKind = referencedExpr.getKind();
381+
if (sqlKind == SqlKind.OTHER_FUNCTION || sqlKind == SqlKind.CAST || sqlKind == SqlKind.ROW) {
382+
SqlNode functionCall = toSql(program, referencedExpr);
383+
Collections.reverse(accessNames);
384+
for (String accessName : accessNames) {
385+
functionCall = FunctionFieldReferenceOperator.DOT.createCall(SqlParserPos.ZERO, functionCall,
386+
new SqlIdentifier(accessName, POS));
387+
}
388+
return functionCall;
389+
}
390+
}
391+
return super.toSql(program, rex);
392+
}
393+
};
394+
}
348395
}

coral-spark/src/main/java/com/linkedin/coral/spark/BuiltinUDFMap.java

Lines changed: 0 additions & 49 deletions
This file was deleted.

coral-spark/src/main/java/com/linkedin/coral/spark/CoralSpark.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package com.linkedin.coral.spark;
77

88
import java.util.List;
9+
import java.util.Set;
910
import java.util.stream.Collectors;
1011

1112
import org.apache.avro.Schema;
@@ -16,6 +17,7 @@
1617
import org.apache.calcite.sql.SqlNode;
1718
import org.apache.calcite.sql.SqlSelect;
1819

20+
import com.linkedin.coral.com.google.common.collect.ImmutableList;
1921
import com.linkedin.coral.spark.containers.SparkRelInfo;
2022
import com.linkedin.coral.spark.containers.SparkUDFInfo;
2123
import com.linkedin.coral.spark.dialect.SparkSqlDialect;
@@ -63,11 +65,11 @@ private CoralSpark(List<String> baseTables, List<SparkUDFInfo> sparkUDFInfoList,
6365
*/
6466
public static CoralSpark create(RelNode irRelNode) {
6567
SparkRelInfo sparkRelInfo = IRRelToSparkRelTransformer.transform(irRelNode);
68+
Set<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfos();
6669
RelNode sparkRelNode = sparkRelInfo.getSparkRelNode();
67-
String sparkSQL = constructSparkSQL(sparkRelNode);
70+
String sparkSQL = constructSparkSQL(sparkRelNode, sparkUDFInfos);
6871
List<String> baseTables = constructBaseTables(sparkRelNode);
69-
List<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfoList();
70-
return new CoralSpark(baseTables, sparkUDFInfos, sparkSQL);
72+
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL);
7173
}
7274

7375
/**
@@ -86,11 +88,11 @@ public static CoralSpark create(RelNode irRelNode, Schema schema) {
8688

8789
private static CoralSpark createWithAlias(RelNode irRelNode, List<String> aliases) {
8890
SparkRelInfo sparkRelInfo = IRRelToSparkRelTransformer.transform(irRelNode);
91+
Set<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfos();
8992
RelNode sparkRelNode = sparkRelInfo.getSparkRelNode();
90-
String sparkSQL = constructSparkSQLWithExplicitAlias(sparkRelNode, aliases);
93+
String sparkSQL = constructSparkSQLWithExplicitAlias(sparkRelNode, aliases, sparkUDFInfos);
9194
List<String> baseTables = constructBaseTables(sparkRelNode);
92-
List<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfoList();
93-
return new CoralSpark(baseTables, sparkUDFInfos, sparkSQL);
95+
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL);
9496
}
9597

9698
/**
@@ -105,21 +107,25 @@ private static CoralSpark createWithAlias(RelNode irRelNode, List<String> aliase
105107
*
106108
* @param sparkRelNode A Spark compatible RelNode
107109
*
110+
* @param sparkUDFInfos A set of Spark UDF information objects
108111
* @return SQL String in Spark SQL dialect which is 'completely expanded'
109112
*/
110-
private static String constructSparkSQL(RelNode sparkRelNode) {
113+
private static String constructSparkSQL(RelNode sparkRelNode, Set<SparkUDFInfo> sparkUDFInfos) {
111114
CoralRelToSqlNodeConverter rel2sql = new CoralRelToSqlNodeConverter();
112115
SqlNode coralSqlNode = rel2sql.convert(sparkRelNode);
113-
SqlNode sparkSqlNode = coralSqlNode.accept(new CoralSqlNodeToSparkSqlNodeConverter());
116+
SqlNode sparkSqlNode = coralSqlNode.accept(new CoralSqlNodeToSparkSqlNodeConverter())
117+
.accept(new CoralToSparkSqlCallConverter(sparkUDFInfos));
114118
SqlNode rewrittenSparkSqlNode = sparkSqlNode.accept(new SparkSqlRewriter());
115119
return rewrittenSparkSqlNode.toSqlString(SparkSqlDialect.INSTANCE).getSql();
116120
}
117121

118-
private static String constructSparkSQLWithExplicitAlias(RelNode sparkRelNode, List<String> aliases) {
122+
private static String constructSparkSQLWithExplicitAlias(RelNode sparkRelNode, List<String> aliases,
123+
Set<SparkUDFInfo> sparkUDFInfos) {
119124
CoralRelToSqlNodeConverter rel2sql = new CoralRelToSqlNodeConverter();
120125
// Create temporary objects r and rewritten to make debugging easier
121126
SqlNode coralSqlNode = rel2sql.convert(sparkRelNode);
122-
SqlNode sparkSqlNode = coralSqlNode.accept(new CoralSqlNodeToSparkSqlNodeConverter());
127+
SqlNode sparkSqlNode = coralSqlNode.accept(new CoralSqlNodeToSparkSqlNodeConverter())
128+
.accept(new CoralToSparkSqlCallConverter(sparkUDFInfos));
123129

124130
SqlNode rewritten = sparkSqlNode.accept(new SparkSqlRewriter());
125131
// Use a second pass visit to add explicit alias names,
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/**
2+
* Copyright 2023 LinkedIn Corporation. All rights reserved.
3+
* Licensed under the BSD-2 Clause license.
4+
* See LICENSE in the project root for license information.
5+
*/
6+
package com.linkedin.coral.spark;
7+
8+
import java.util.Set;
9+
10+
import org.apache.calcite.sql.SqlCall;
11+
import org.apache.calcite.sql.SqlNode;
12+
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
13+
import org.apache.calcite.sql.util.SqlShuttle;
14+
15+
import com.linkedin.coral.common.transformers.OperatorRenameSqlCallTransformer;
16+
import com.linkedin.coral.common.transformers.SqlCallTransformers;
17+
import com.linkedin.coral.spark.containers.SparkUDFInfo;
18+
import com.linkedin.coral.spark.transformers.FallBackToLinkedInHiveUDFTransformer;
19+
import com.linkedin.coral.spark.transformers.TransportUDFTransformer;
20+
21+
import static com.linkedin.coral.spark.transformers.TransportUDFTransformer.*;
22+
23+
24+
/**
25+
* This class extends the class of {@link org.apache.calcite.sql.util.SqlShuttle} and initialize a {@link com.linkedin.coral.common.transformers.SqlCallTransformers}
26+
* which containing a list of {@link com.linkedin.coral.common.transformers.SqlCallTransformer} to traverse the hierarchy of a {@link org.apache.calcite.sql.SqlCall}
27+
* and converts the functions from Coral operator to Spark operator if it is required
28+
*
29+
* In this converter, we need to apply {@link TransportUDFTransformer} before {@link FallBackToLinkedInHiveUDFTransformer}
30+
* because we should try to transform a UDF to an equivalent Transport UDF before falling back to LinkedIn Hive UDF.
31+
*/
32+
public class CoralToSparkSqlCallConverter extends SqlShuttle {
33+
private final SqlCallTransformers sqlCallTransformers;
34+
35+
public CoralToSparkSqlCallConverter(Set<SparkUDFInfo> sparkUDFInfos) {
36+
this.sqlCallTransformers = SqlCallTransformers.of(
37+
// Transport UDFs
38+
new TransportUDFTransformer("com.linkedin.dali.udf.date.hive.DateFormatToEpoch",
39+
"com.linkedin.stdudfs.daliudfs.spark.DateFormatToEpoch", DALI_UDFS_IVY_URL_SPARK_2_11,
40+
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),
41+
42+
new TransportUDFTransformer("com.linkedin.dali.udf.date.hive.EpochToDateFormat",
43+
"com.linkedin.stdudfs.daliudfs.spark.EpochToDateFormat", DALI_UDFS_IVY_URL_SPARK_2_11,
44+
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),
45+
46+
new TransportUDFTransformer("com.linkedin.dali.udf.date.hive.EpochToEpochMilliseconds",
47+
"com.linkedin.stdudfs.daliudfs.spark.EpochToEpochMilliseconds", DALI_UDFS_IVY_URL_SPARK_2_11,
48+
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),
49+
50+
new TransportUDFTransformer("com.linkedin.dali.udf.isguestmemberid.hive.IsGuestMemberId",
51+
"com.linkedin.stdudfs.daliudfs.spark.IsGuestMemberId", DALI_UDFS_IVY_URL_SPARK_2_11,
52+
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),
53+
54+
new TransportUDFTransformer("com.linkedin.dali.udf.istestmemberid.hive.IsTestMemberId",
55+
"com.linkedin.stdudfs.daliudfs.spark.IsTestMemberId", DALI_UDFS_IVY_URL_SPARK_2_11,
56+
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),
57+
58+
new TransportUDFTransformer("com.linkedin.dali.udf.maplookup.hive.MapLookup",
59+
"com.linkedin.stdudfs.daliudfs.spark.MapLookup", DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12,
60+
sparkUDFInfos),
61+
62+
new TransportUDFTransformer("com.linkedin.dali.udf.sanitize.hive.Sanitize",
63+
"com.linkedin.stdudfs.daliudfs.spark.Sanitize", DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12,
64+
sparkUDFInfos),
65+
66+
new TransportUDFTransformer("com.linkedin.dali.udf.watbotcrawlerlookup.hive.WATBotCrawlerLookup",
67+
"com.linkedin.stdudfs.daliudfs.spark.WatBotCrawlerLookup", DALI_UDFS_IVY_URL_SPARK_2_11,
68+
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),
69+
70+
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.DateFormatToEpoch",
71+
"com.linkedin.stdudfs.daliudfs.spark.DateFormatToEpoch", DALI_UDFS_IVY_URL_SPARK_2_11,
72+
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),
73+
74+
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.EpochToDateFormat",
75+
"com.linkedin.stdudfs.daliudfs.spark.EpochToDateFormat", DALI_UDFS_IVY_URL_SPARK_2_11,
76+
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),
77+
78+
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.EpochToEpochMilliseconds",
79+
"com.linkedin.stdudfs.daliudfs.spark.EpochToEpochMilliseconds", DALI_UDFS_IVY_URL_SPARK_2_11,
80+
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),
81+
82+
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.GetProfileSections",
83+
"com.linkedin.stdudfs.daliudfs.spark.GetProfileSections", DALI_UDFS_IVY_URL_SPARK_2_11,
84+
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),
85+
86+
new TransportUDFTransformer("com.linkedin.stdudfs.stringudfs.hive.InitCap",
87+
"com.linkedin.stdudfs.stringudfs.spark.InitCap",
88+
"ivy://com.linkedin.standard-udfs-common-sql-udfs:standard-udfs-string-udfs:1.0.1?classifier=spark_2.11",
89+
"ivy://com.linkedin.standard-udfs-common-sql-udfs:standard-udfs-string-udfs:1.0.1?classifier=spark_2.12",
90+
sparkUDFInfos),
91+
92+
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.IsGuestMemberId",
93+
"com.linkedin.stdudfs.daliudfs.spark.IsGuestMemberId", DALI_UDFS_IVY_URL_SPARK_2_11,
94+
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),
95+
96+
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.IsTestMemberId",
97+
"com.linkedin.stdudfs.daliudfs.spark.IsTestMemberId", DALI_UDFS_IVY_URL_SPARK_2_11,
98+
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),
99+
100+
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.MapLookup",
101+
"com.linkedin.stdudfs.daliudfs.spark.MapLookup", DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12,
102+
sparkUDFInfos),
103+
104+
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.PortalLookup",
105+
"com.linkedin.stdudfs.daliudfs.spark.PortalLookup", DALI_UDFS_IVY_URL_SPARK_2_11,
106+
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),
107+
108+
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.Sanitize",
109+
"com.linkedin.stdudfs.daliudfs.spark.Sanitize", DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12,
110+
sparkUDFInfos),
111+
112+
new TransportUDFTransformer("com.linkedin.stdudfs.userinterfacelookup.hive.UserInterfaceLookup",
113+
"com.linkedin.stdudfs.userinterfacelookup.spark.UserInterfaceLookup",
114+
"ivy://com.linkedin.standard-udf-userinterfacelookup:userinterfacelookup-std-udf:0.0.27?classifier=spark_2.11",
115+
"ivy://com.linkedin.standard-udf-userinterfacelookup:userinterfacelookup-std-udf:0.0.27?classifier=spark_2.12",
116+
sparkUDFInfos),
117+
118+
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.WatBotCrawlerLookup",
119+
"com.linkedin.stdudfs.daliudfs.spark.WatBotCrawlerLookup", DALI_UDFS_IVY_URL_SPARK_2_11,
120+
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),
121+
122+
new TransportUDFTransformer("com.linkedin.jemslookup.udf.hive.JemsLookup",
123+
"com.linkedin.jemslookup.udf.spark.JemsLookup",
124+
"ivy://com.linkedin.jobs-udf:jems-udfs:2.1.7?classifier=spark_2.11",
125+
"ivy://com.linkedin.jobs-udf:jems-udfs:2.1.7?classifier=spark_2.12", sparkUDFInfos),
126+
127+
new TransportUDFTransformer("com.linkedin.stdudfs.parsing.hive.UserAgentParser",
128+
"com.linkedin.stdudfs.parsing.spark.UserAgentParser",
129+
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.11",
130+
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.12", sparkUDFInfos),
131+
132+
new TransportUDFTransformer("com.linkedin.stdudfs.parsing.hive.Ip2Str",
133+
"com.linkedin.stdudfs.parsing.spark.Ip2Str",
134+
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.11",
135+
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.12", sparkUDFInfos),
136+
137+
new TransportUDFTransformer("com.linkedin.stdudfs.lookup.hive.BrowserLookup",
138+
"com.linkedin.stdudfs.lookup.spark.BrowserLookup",
139+
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.11",
140+
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.12", sparkUDFInfos),
141+
142+
new TransportUDFTransformer("com.linkedin.jobs.udf.hive.ConvertIndustryCode",
143+
"com.linkedin.jobs.udf.spark.ConvertIndustryCode",
144+
"ivy://com.linkedin.jobs-udf:jobs-udfs:2.1.6?classifier=spark_2.11",
145+
"ivy://com.linkedin.jobs-udf:jobs-udfs:2.1.6?classifier=spark_2.12", sparkUDFInfos),
146+
147+
// Transport UDF for unit test
148+
new TransportUDFTransformer("com.linkedin.coral.hive.hive2rel.CoralTestUDF",
149+
"com.linkedin.coral.spark.CoralTestUDF",
150+
"ivy://com.linkedin.coral.spark.CoralTestUDF?classifier=spark_2.11", null, sparkUDFInfos),
151+
152+
// Built-in operator
153+
new OperatorRenameSqlCallTransformer(SqlStdOperatorTable.CARDINALITY, 1, "size"),
154+
155+
// Fall back to the original Hive UDF defined in StaticHiveFunctionRegistry after failing to apply transformers above
156+
new FallBackToLinkedInHiveUDFTransformer(sparkUDFInfos));
157+
}
158+
159+
@Override
160+
public SqlNode visit(SqlCall call) {
161+
final SqlCall transformedSqlCall = sqlCallTransformers.apply(call);
162+
return super.visit(transformedSqlCall);
163+
}
164+
}

0 commit comments

Comments
 (0)