-
Notifications
You must be signed in to change notification settings - Fork 200
[WIP] Add support for incremental rewrite of nested queries #400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
a80f590
69fef78
8289b6b
88fa70d
c3caa1a
00bc8cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| /** | ||
| * Copyright 2023 LinkedIn Corporation. All rights reserved. | ||
| * Licensed under the BSD-2 Clause license. | ||
| * See LICENSE in the project root for license information. | ||
| */ | ||
| package com.linkedin.coral.incremental; | ||
|
|
||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
|
|
||
| import org.apache.calcite.rel.RelNode; | ||
|
|
||
|
|
||
| public class IncrementalTransformerResults { | ||
|
|
||
| private RelNode incrementalRelNode; | ||
| private RelNode refreshRelNode; | ||
| private Map<String, RelNode> intermediateQueryRelNodes; | ||
AliceYeh12 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| public IncrementalTransformerResults() { | ||
| incrementalRelNode = null; | ||
| refreshRelNode = null; | ||
|
||
| intermediateQueryRelNodes = new HashMap<>(); | ||
| } | ||
|
|
||
| public boolean existsIncrementalRelNode() { | ||
| return incrementalRelNode != null; | ||
| } | ||
|
|
||
| public RelNode getIncrementalRelNode() { | ||
| return incrementalRelNode; | ||
| } | ||
|
|
||
| public boolean existsRefreshRelNode() { | ||
| return refreshRelNode != null; | ||
| } | ||
|
|
||
| public RelNode getRefreshRelNode() { | ||
| return refreshRelNode; | ||
| } | ||
|
|
||
| public Map<String, RelNode> getIntermediateQueryRelNodes() { | ||
| return intermediateQueryRelNodes; | ||
| } | ||
|
|
||
| public boolean containsIntermediateQueryRelNodeKey(String name) { | ||
| return intermediateQueryRelNodes.containsKey(name); | ||
| } | ||
|
|
||
| public RelNode getIntermediateQueryRelNodeCorrespondingToKey(String name) { | ||
| return intermediateQueryRelNodes.get(name); | ||
| } | ||
|
|
||
| public void setIncrementalRelNode(RelNode incrementalRelNode) { | ||
| this.incrementalRelNode = incrementalRelNode; | ||
| } | ||
|
|
||
| public void setRefreshRelNode(RelNode refreshRelNode) { | ||
| this.refreshRelNode = refreshRelNode; | ||
| } | ||
|
|
||
| public void setIntermediateQueryRelNodes(Map<String, RelNode> intermediateQueryRelNodes) { | ||
| this.intermediateQueryRelNodes = intermediateQueryRelNodes; | ||
| } | ||
|
|
||
| public void addIntermediateQueryRelNode(String name, RelNode intermediateRelNode) { | ||
| this.intermediateQueryRelNodes.put(name, intermediateRelNode); | ||
| } | ||
|
|
||
| public void addMultipleIntermediateQueryRelNodes(Map<String, RelNode> intermediateQueryRelNodes) { | ||
AliceYeh12 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (intermediateQueryRelNodes != null) { | ||
| this.intermediateQueryRelNodes.putAll(intermediateQueryRelNodes); | ||
| } | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,10 +7,12 @@ | |
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.IntStream; | ||
|
|
||
| import org.apache.calcite.plan.RelOptSchema; | ||
| import org.apache.calcite.plan.RelOptTable; | ||
| import org.apache.calcite.prepare.RelOptTableImpl; | ||
| import org.apache.calcite.rel.RelNode; | ||
|
|
@@ -29,14 +31,29 @@ | |
|
|
||
| public class RelNodeIncrementalTransformer { | ||
|
|
||
| private static RelOptSchema relOptSchema; | ||
|
|
||
| private RelNodeIncrementalTransformer() { | ||
| } | ||
|
|
||
| public static RelNode convertRelIncremental(RelNode originalNode) { | ||
| public static IncrementalTransformerResults performIncrementalTransformation(RelNode originalNode) { | ||
| IncrementalTransformerResults incrementalTransformerResults = convertRelIncremental(originalNode); | ||
| return incrementalTransformerResults; | ||
| } | ||
|
|
||
| private static IncrementalTransformerResults convertRelIncremental(RelNode originalNode) { | ||
| IncrementalTransformerResults incrementalTransformerResults = new IncrementalTransformerResults(); | ||
| RelShuttle converter = new RelShuttleImpl() { | ||
| @Override | ||
| public RelNode visit(TableScan scan) { | ||
| RelOptTable originalTable = scan.getTable(); | ||
|
|
||
| // Set relOptSchema | ||
| if (relOptSchema == null) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is the use case for this check? when will
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check is to set |
||
| relOptSchema = originalTable.getRelOptSchema(); | ||
| } | ||
|
|
||
| // Create delta scan | ||
| List<String> incrementalNames = new ArrayList<>(originalTable.getQualifiedName()); | ||
| String deltaTableName = incrementalNames.remove(incrementalNames.size() - 1) + "_delta"; | ||
| incrementalNames.add(deltaTableName); | ||
|
|
@@ -49,11 +66,35 @@ public RelNode visit(TableScan scan) { | |
| public RelNode visit(LogicalJoin join) { | ||
| RelNode left = join.getLeft(); | ||
| RelNode right = join.getRight(); | ||
| RelNode incrementalLeft = convertRelIncremental(left); | ||
| RelNode incrementalRight = convertRelIncremental(right); | ||
| IncrementalTransformerResults incrementalTransformerResultsLeft = convertRelIncremental(left); | ||
| IncrementalTransformerResults incrementalTransformerResultsRight = convertRelIncremental(right); | ||
| RelNode incrementalLeft = incrementalTransformerResultsLeft.getIncrementalRelNode(); | ||
| RelNode incrementalRight = incrementalTransformerResultsRight.getIncrementalRelNode(); | ||
| incrementalTransformerResults | ||
|
||
| .addMultipleIntermediateQueryRelNodes(incrementalTransformerResultsLeft.getIntermediateQueryRelNodes()); | ||
| incrementalTransformerResults | ||
| .addMultipleIntermediateQueryRelNodes(incrementalTransformerResultsRight.getIntermediateQueryRelNodes()); | ||
|
|
||
| RexBuilder rexBuilder = join.getCluster().getRexBuilder(); | ||
|
|
||
| // Check if we can replace the left and right nodes with a scan of a materialized table | ||
AliceYeh12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (incrementalTransformerResults.containsIntermediateQueryRelNodeKey(left.getDescription())) { | ||
| String description = left.getDescription(); | ||
| LogicalProject leftLastProject = createReplacementProjectNodeForGivenRelNode(description, left, rexBuilder); | ||
| left = leftLastProject; | ||
| LogicalProject leftDeltaProject = | ||
| createReplacementProjectNodeForGivenRelNode(description + "_delta", incrementalLeft, rexBuilder); | ||
| incrementalLeft = leftDeltaProject; | ||
| } | ||
| if (incrementalTransformerResults.containsIntermediateQueryRelNodeKey(right.getDescription())) { | ||
| LogicalProject rightLastProject = | ||
| createReplacementProjectNodeForGivenRelNode(right.getDescription(), right, rexBuilder); | ||
| right = rightLastProject; | ||
| LogicalProject rightDeltaProject = createReplacementProjectNodeForGivenRelNode( | ||
| right.getDescription() + "_delta", incrementalRight, rexBuilder); | ||
| incrementalRight = rightDeltaProject; | ||
| } | ||
|
|
||
| LogicalProject p1 = createProjectOverJoin(join, left, incrementalRight, rexBuilder); | ||
| LogicalProject p2 = createProjectOverJoin(join, incrementalLeft, right, rexBuilder); | ||
| LogicalProject p3 = createProjectOverJoin(join, incrementalLeft, incrementalRight, rexBuilder); | ||
|
|
@@ -65,45 +106,78 @@ public RelNode visit(LogicalJoin join) { | |
|
|
||
| @Override | ||
| public RelNode visit(LogicalFilter filter) { | ||
| RelNode transformedChild = convertRelIncremental(filter.getInput()); | ||
| IncrementalTransformerResults incrementalTransformerResultsChild = convertRelIncremental(filter.getInput()); | ||
| RelNode transformedChild = incrementalTransformerResultsChild.getIncrementalRelNode(); | ||
| incrementalTransformerResults | ||
| .addMultipleIntermediateQueryRelNodes(incrementalTransformerResultsChild.getIntermediateQueryRelNodes()); | ||
| return LogicalFilter.create(transformedChild, filter.getCondition()); | ||
| } | ||
|
|
||
| @Override | ||
| public RelNode visit(LogicalProject project) { | ||
| RelNode transformedChild = convertRelIncremental(project.getInput()); | ||
| return LogicalProject.create(transformedChild, project.getProjects(), project.getRowType()); | ||
| IncrementalTransformerResults incrementalTransformerResultsChild = convertRelIncremental(project.getInput()); | ||
| RelNode transformedChild = incrementalTransformerResultsChild.getIncrementalRelNode(); | ||
| incrementalTransformerResults | ||
| .addMultipleIntermediateQueryRelNodes(incrementalTransformerResultsChild.getIntermediateQueryRelNodes()); | ||
| incrementalTransformerResults.addIntermediateQueryRelNode(project.getDescription(), project); | ||
| LogicalProject transformedProject = | ||
| LogicalProject.create(transformedChild, project.getProjects(), project.getRowType()); | ||
| incrementalTransformerResults.addIntermediateQueryRelNode(project.getDescription() + "_delta", | ||
| transformedProject); | ||
| return transformedProject; | ||
| } | ||
|
|
||
| @Override | ||
| public RelNode visit(LogicalUnion union) { | ||
| List<RelNode> children = union.getInputs(); | ||
| List<RelNode> transformedChildren = | ||
| List<IncrementalTransformerResults> incrementalTransformerResultsChildren = | ||
| children.stream().map(child -> convertRelIncremental(child)).collect(Collectors.toList()); | ||
| List<RelNode> transformedChildren = new ArrayList<>(); | ||
| for (IncrementalTransformerResults incrementalTransformerResultsChild : incrementalTransformerResultsChildren) { | ||
| transformedChildren.add(incrementalTransformerResultsChild.getIncrementalRelNode()); | ||
| incrementalTransformerResults | ||
| .addMultipleIntermediateQueryRelNodes(incrementalTransformerResultsChild.getIntermediateQueryRelNodes()); | ||
| } | ||
| return LogicalUnion.create(transformedChildren, union.all); | ||
| } | ||
|
|
||
| @Override | ||
| public RelNode visit(LogicalAggregate aggregate) { | ||
| RelNode transformedChild = convertRelIncremental(aggregate.getInput()); | ||
| IncrementalTransformerResults incrementalTransformerResultsChild = convertRelIncremental(aggregate.getInput()); | ||
| RelNode transformedChild = incrementalTransformerResultsChild.getIncrementalRelNode(); | ||
| incrementalTransformerResults | ||
| .addMultipleIntermediateQueryRelNodes(incrementalTransformerResultsChild.getIntermediateQueryRelNodes()); | ||
| return LogicalAggregate.create(transformedChild, aggregate.getGroupSet(), aggregate.getGroupSets(), | ||
| aggregate.getAggCallList()); | ||
| } | ||
| }; | ||
| return originalNode.accept(converter); | ||
| incrementalTransformerResults.setIncrementalRelNode(originalNode.accept(converter)); | ||
| return incrementalTransformerResults; | ||
| } | ||
|
|
||
| private static LogicalProject createProjectOverJoin(LogicalJoin join, RelNode left, RelNode right, | ||
| private static LogicalProject createReplacementProjectNodeForGivenRelNode(String relOptTableName, RelNode relNode, | ||
AliceYeh12 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| RexBuilder rexBuilder) { | ||
| LogicalJoin incrementalJoin = | ||
| LogicalJoin.create(left, right, join.getCondition(), join.getVariablesSet(), join.getJoinType()); | ||
| RelOptTable table = | ||
| RelOptTableImpl.create(relOptSchema, relNode.getRowType(), Collections.singletonList(relOptTableName), null); | ||
| TableScan scan = LogicalTableScan.create(relNode.getCluster(), table); | ||
| return createProjectOverNode(scan, rexBuilder); | ||
| } | ||
|
|
||
| private static LogicalProject createProjectOverNode(RelNode relNode, RexBuilder rexBuilder) { | ||
AliceYeh12 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ArrayList<RexNode> projects = new ArrayList<>(); | ||
| ArrayList<String> names = new ArrayList<>(); | ||
| IntStream.range(0, incrementalJoin.getRowType().getFieldList().size()).forEach(i -> { | ||
| projects.add(rexBuilder.makeInputRef(incrementalJoin, i)); | ||
| names.add(incrementalJoin.getRowType().getFieldNames().get(i)); | ||
| IntStream.range(0, relNode.getRowType().getFieldList().size()).forEach(i -> { | ||
| projects.add(rexBuilder.makeInputRef(relNode, i)); | ||
| names.add(relNode.getRowType().getFieldNames().get(i)); | ||
| }); | ||
| return LogicalProject.create(incrementalJoin, projects, names); | ||
| return LogicalProject.create(relNode, projects, names); | ||
| } | ||
|
|
||
| private static LogicalProject createProjectOverJoin(LogicalJoin join, RelNode left, RelNode right, | ||
| RexBuilder rexBuilder) { | ||
| LogicalJoin incrementalJoin = | ||
| LogicalJoin.create(left, right, join.getCondition(), join.getVariablesSet(), join.getJoinType()); | ||
| return createProjectOverNode(incrementalJoin, rexBuilder); | ||
| } | ||
|
|
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.