Skip to content

Commit df1c2ff

Browse files
morrySnowzhiqiang-hhhh
authored andcommitted
[opt](Nereids) push down virtual column into scan (apache#50521)
1 parent 16ba7fa commit df1c2ff

File tree

16 files changed

+262
-69
lines changed

16 files changed

+262
-69
lines changed

fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public class SlotDescriptor {
7474
// materialize them.Used to optimize to read less data and less memory usage
7575
private boolean needMaterialize = true;
7676
private boolean isAutoInc = false;
77+
private Expr virtualColumn = null;
7778

7879
public SlotDescriptor(SlotId id, TupleDescriptor parent) {
7980
this.id = id;
@@ -268,6 +269,14 @@ public int getUniqueId() {
268269
return column.getUniqueId();
269270
}
270271

272+
public Expr getVirtualColumn() {
273+
return virtualColumn;
274+
}
275+
276+
public void setVirtualColumn(Expr virtualColumn) {
277+
this.virtualColumn = virtualColumn;
278+
}
279+
271280
/**
272281
* Initializes a slot by setting its source expression information
273282
*/
@@ -322,6 +331,9 @@ public TSlotDescriptor toThrift() {
322331
if (subColPath != null) {
323332
tSlotDescriptor.setColumnPaths(subColPath);
324333
}
334+
if (virtualColumn != null) {
335+
tSlotDescriptor.setVirtualColumnExpr(virtualColumn.treeToThrift());
336+
}
325337
return tSlotDescriptor;
326338
}
327339

@@ -332,7 +344,8 @@ public String debugString() {
332344
return MoreObjects.toStringHelper(this).add("id", id.asInt()).add("parent", parentTupleId).add("col", colStr)
333345
.add("type", typeStr).add("materialized", isMaterialized).add("byteSize", byteSize)
334346
.add("byteOffset", byteOffset).add("slotIdx", slotIdx).add("nullable", getIsNullable())
335-
.add("isAutoIncrement", isAutoInc).add("subColPath", subColPath).toString();
347+
.add("isAutoIncrement", isAutoInc).add("subColPath", subColPath)
348+
.add("virtualColumn", virtualColumn.toSql()).toString();
336349
}
337350

338351
@Override
@@ -350,6 +363,7 @@ public String getExplainString(String prefix) {
350363
.append(", nullable=").append(isNullable)
351364
.append(", isAutoIncrement=").append(isAutoInc)
352365
.append(", subColPath=").append(subColPath)
366+
.append(", virtualColumn=").append(virtualColumn)
353367
.append("}")
354368
.toString();
355369
}

fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,7 @@ protected void toThrift(TExprNode msg) {
350350
msg.node_type = TExprNodeType.SLOT_REF;
351351
msg.slot_ref = new TSlotRef(desc.getId().asInt(), desc.getParent().getId().asInt());
352352
msg.slot_ref.setColUniqueId(desc.getUniqueId());
353+
msg.slot_ref.setIsVirtualSlot(desc.getVirtualColumn() != null);
353354
msg.setLabel(label);
354355
}
355356

fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,18 @@ private PlanFragment computePhysicalOlapScan(PhysicalOlapScan olapScan,
818818
OlapTable olapTable = olapScan.getTable();
819819
// generate real output tuple
820820
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, olapTable, context);
821+
822+
// put virtual column expr into slot desc
823+
Map<ExprId, Expression> slotToVirtualColumnMap = olapScan.getSlotToVirtualColumnMap();
824+
for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
825+
ExprId exprId = context.findExprId(slotDescriptor.getId());
826+
if (slotToVirtualColumnMap.containsKey(exprId)) {
827+
slotDescriptor.setVirtualColumn(ExpressionTranslator.translate(
828+
slotToVirtualColumnMap.get(exprId), context));
829+
context.getVirtualColumnIds().add(slotDescriptor.getId());
830+
}
831+
}
832+
821833
// generate base index tuple because this fragment partitioned expr relay on slots of based index
822834
if (olapScan.getSelectedIndexId() != olapScan.getTable().getBaseIndexId()) {
823835
generateTupleDesc(olapScan.getBaseOutputs(), olapTable, context);
@@ -2716,6 +2728,16 @@ private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table, Pl
27162728
return tupleDescriptor;
27172729
}
27182730

2731+
private TupleDescriptor generateTupleDescWithVirtualColumns(List<Slot> slotList, List<Expression> virtualColumns,
2732+
TableIf table, PlanTranslatorContext context) {
2733+
TupleDescriptor tupleDescriptor = context.generateTupleDesc();
2734+
tupleDescriptor.setTable(table);
2735+
for (Slot slot : slotList) {
2736+
context.createSlotDesc(tupleDescriptor, (SlotReference) slot, table);
2737+
}
2738+
return tupleDescriptor;
2739+
}
2740+
27192741
private PlanFragment connectJoinNode(HashJoinNode hashJoinNode, PlanFragment leftFragment,
27202742
PlanFragment rightFragment, PlanTranslatorContext context, AbstractPlan join) {
27212743
hashJoinNode.setChild(0, leftFragment.getPlanRoot());

fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public class PlanTranslatorContext {
117117
private final Map<ScanNode, Set<SlotId>> statsUnknownColumnsMap = Maps.newHashMap();
118118

119119
private boolean isTopMaterializeNode = true;
120+
private final Set<SlotId> virtualColumnIds = Sets.newHashSet();
120121

121122
public PlanTranslatorContext(CascadesContext ctx) {
122123
this.connectContext = ctx.getConnectContext();
@@ -354,4 +355,7 @@ public void setTopMaterializeNode(boolean topMaterializeNode) {
354355
isTopMaterializeNode = topMaterializeNode;
355356
}
356357

358+
public Set<SlotId> getVirtualColumnIds() {
359+
return virtualColumnIds;
360+
}
357361
}

fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@
132132
import org.apache.doris.nereids.rules.rewrite.PushDownTopNThroughJoin;
133133
import org.apache.doris.nereids.rules.rewrite.PushDownTopNThroughUnion;
134134
import org.apache.doris.nereids.rules.rewrite.PushDownTopNThroughWindow;
135+
import org.apache.doris.nereids.rules.rewrite.PushDownVirualColumnsIntoOlapScan;
135136
import org.apache.doris.nereids.rules.rewrite.PushFilterInsideJoin;
136137
import org.apache.doris.nereids.rules.rewrite.PushProjectIntoOneRowRelation;
137138
import org.apache.doris.nereids.rules.rewrite.PushProjectIntoUnion;
@@ -468,6 +469,8 @@ public class Rewriter extends AbstractBatchJobExecutor {
468469
// this rule is to collect filter on basic table for hbo usage
469470
topDown(new CollectPredicateOnScan())
470471
),
472+
topDown(new PushDownVirualColumnsIntoOlapScan()),
473+
471474
topic("Push project and filter on cte consumer to cte producer",
472475
topDown(
473476
new CollectFilterAboveConsumer(),

fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CommonSubExpressionCollector.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ private int collectCommonExpressionByDepth(int depth, Expression expr, boolean i
6161
// ArrayItemSlot and ArrayItemReference could not be common expressions
6262
// TODO: could not extract common expression when expression contains same lambda expression
6363
// because ArrayItemSlot in Lambda are not same.
64-
if (expressions.contains(expr)
65-
&& !(inLambda && expr.containsType(ArrayItemSlot.class, ArrayItemReference.class))) {
64+
if (!(inLambda && expr.containsType(ArrayItemSlot.class, ArrayItemReference.class))) {
6665
Set<Expression> commonExpression = getExpressionsFromDepthMap(depth, commonExprByDepth);
6766
commonExpression.add(expr);
6867
}

fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ public List<PlanPostProcessor> getProcessors() {
6767
}
6868
builder.add(new MergeProjectPostProcessor());
6969
builder.add(new RecomputeLogicalPropertiesProcessor());
70-
if (cascadesContext.getConnectContext().getSessionVariable().enableAggregateCse) {
71-
builder.add(new ProjectAggregateExpressionsForCse());
72-
}
73-
builder.add(new CommonSubExpressionOpt());
70+
// if (cascadesContext.getConnectContext().getSessionVariable().enableAggregateCse) {
71+
// builder.add(new ProjectAggregateExpressionsForCse());
72+
// }
73+
// builder.add(new CommonSubExpressionOpt());
7474
// DO NOT replace PLAN NODE from here
7575
if (cascadesContext.getConnectContext().getSessionVariable().pushTopnToAgg) {
7676
builder.add(new PushTopnToAgg());

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ public enum RuleType {
313313
PUSH_CONJUNCTS_INTO_JDBC_SCAN(RuleTypeClass.REWRITE),
314314
PUSH_CONJUNCTS_INTO_ODBC_SCAN(RuleTypeClass.REWRITE),
315315
PUSH_CONJUNCTS_INTO_ES_SCAN(RuleTypeClass.REWRITE),
316+
PUSH_DOWN_VIRTUAL_COLUMNS_INTO_OLAP_SCAN(RuleTypeClass.REWRITE),
316317
OLAP_SCAN_TABLET_PRUNE(RuleTypeClass.REWRITE),
317318
PUSH_AGGREGATE_TO_OLAP_SCAN(RuleTypeClass.REWRITE),
318319
PUSH_COUNT_INTO_UNION_ALL(RuleTypeClass.REWRITE),

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public Rule build() {
6262
Optional.empty(),
6363
olapScan.getLogicalProperties(),
6464
olapScan.getTableSample(),
65-
olapScan.getOperativeSlots())
65+
olapScan.getOperativeSlots(),
66+
olapScan.getVirtualColumns())
6667
).toRule(RuleType.LOGICAL_OLAP_SCAN_TO_PHYSICAL_OLAP_SCAN_RULE);
6768
}
6869

@@ -116,8 +117,8 @@ private DistributionSpec convertDistribution(LogicalOlapScan olapScan) {
116117
// If the length of the column in the bucket key changes after DDL, the length cannot be
117118
// determined. As a result, some bucket fields are lost in the query execution plan.
118119
// So here we use the column name to avoid this problem
119-
if (((SlotReference) slot).getOriginalColumn().get().getName()
120-
.equalsIgnoreCase(column.getName())) {
120+
if (((SlotReference) slot).getColumn().isPresent() && ((SlotReference) slot).getOriginalColumn().get()
121+
.getName().equalsIgnoreCase(column.getName())) {
121122
hashColumns.add(slot.getExprId());
122123
}
123124
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.nereids.rules.rewrite;
19+
20+
import org.apache.doris.nereids.processor.post.CommonSubExpressionCollector;
21+
import org.apache.doris.nereids.processor.post.CommonSubExpressionOpt.ExpressionReplacer;
22+
import org.apache.doris.nereids.rules.Rule;
23+
import org.apache.doris.nereids.rules.RuleType;
24+
import org.apache.doris.nereids.trees.expressions.Alias;
25+
import org.apache.doris.nereids.trees.expressions.Expression;
26+
import org.apache.doris.nereids.trees.expressions.NamedExpression;
27+
import org.apache.doris.nereids.trees.expressions.Slot;
28+
import org.apache.doris.nereids.trees.expressions.WhenClause;
29+
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
30+
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
31+
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
32+
import org.apache.doris.nereids.util.ExpressionUtils;
33+
34+
import com.google.common.collect.Lists;
35+
import com.google.common.collect.Maps;
36+
37+
import java.util.LinkedHashMap;
38+
import java.util.List;
39+
import java.util.Map;
40+
import java.util.Set;
41+
42+
/**
43+
* extract virtual column from filter and push down them into olap scan.
44+
*/
45+
public class PushDownVirualColumnsIntoOlapScan extends OneRewriteRuleFactory {
46+
47+
@Override
48+
public Rule build() {
49+
return logicalProject(logicalFilter(logicalOlapScan()
50+
.when(s -> s.getVirtualColumns().isEmpty())))
51+
.then(project -> {
52+
// 1. extract filter common expr
53+
// 2. generate virtual column from common expr and add them to scan
54+
// 3. replace filter
55+
// 4. replace project
56+
LogicalFilter<LogicalOlapScan> filter = project.child();
57+
LogicalOlapScan logicalOlapScan = filter.child();
58+
CommonSubExpressionCollector collector = new CommonSubExpressionCollector();
59+
for (Expression expr : filter.getConjuncts()) {
60+
collector.collect(expr);
61+
}
62+
Map<Expression, Alias> aliasMap = new LinkedHashMap<>();
63+
if (!collector.commonExprByDepth.isEmpty()) {
64+
for (int i = 1; i <= collector.commonExprByDepth.size(); i++) {
65+
Set<Expression> exprsInDepth = CommonSubExpressionCollector
66+
.getExpressionsFromDepthMap(i, collector.commonExprByDepth);
67+
exprsInDepth.forEach(expr -> {
68+
if (!(expr instanceof WhenClause)) {
69+
// case whenClause1 whenClause2 END
70+
// whenClause should not be regarded as common-sub-expression, because
71+
// cse will be replaced by a slot, after rewrite the case clause becomes:
72+
// 'case slot whenClause2 END'
73+
// This is illegal.
74+
Expression rewritten = expr.accept(ExpressionReplacer.INSTANCE, aliasMap);
75+
// if rewritten is already alias, use it directly,
76+
// because in materialized view rewriting
77+
// Should keep out slot immutably after rewritten successfully
78+
aliasMap.put(expr, rewritten instanceof Alias
79+
? (Alias) rewritten : new Alias(rewritten));
80+
}
81+
});
82+
}
83+
}
84+
List<NamedExpression> virtualColumns = Lists.newArrayList();
85+
Map<Expression, Slot> replaceMap = Maps.newHashMap();
86+
for (Map.Entry<Expression, Alias> entry : aliasMap.entrySet()) {
87+
Alias alias = entry.getValue();
88+
replaceMap.put(entry.getKey(), alias.toSlot());
89+
virtualColumns.add(alias);
90+
}
91+
logicalOlapScan = logicalOlapScan.withVirtualColumns(virtualColumns);
92+
Set<Expression> conjuncts = ExpressionUtils.replace(filter.getConjuncts(), replaceMap);
93+
List<NamedExpression> projections = ExpressionUtils.replace(
94+
(List) project.getProjects(), replaceMap);
95+
LogicalFilter<?> newFilter = filter.withConjunctsAndChild(conjuncts, logicalOlapScan);
96+
LogicalProject<?> newProject = project.withProjectsAndChild(projections, newFilter);
97+
return newProject;
98+
}).toRule(RuleType.PUSH_DOWN_VIRTUAL_COLUMNS_INTO_OLAP_SCAN);
99+
}
100+
}

0 commit comments

Comments
 (0)