Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37403] Convert interval join to regular join for updating source #26230

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -21,6 +21,9 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate;
@@ -46,13 +49,16 @@
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowAggregate;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowTableAggregate;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
import org.apache.flink.table.planner.plan.trait.RelWindowProperties;
import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;

import org.apache.calcite.plan.RelOptUtil;
@@ -108,9 +114,11 @@
public final class RelTimeIndicatorConverter extends RelHomogeneousShuttle {

private final RexBuilder rexBuilder;
private Set<RowKind> srcChangelogModeRowKinds;

private RelTimeIndicatorConverter(RexBuilder rexBuilder) {
this.rexBuilder = rexBuilder;
this.srcChangelogModeRowKinds = new HashSet<>();
}

public static RelNode convert(
@@ -128,9 +136,27 @@ public static RelNode convert(
return converter.materializeProcTime(convertedRoot);
}

private ChangelogMode extractChangelogModeFromSource(RelNode scan) {
if (scan.getTable() instanceof TableSourceTable) {
TableSourceTable tableSourceTable = (TableSourceTable) scan.getTable();
DynamicTableSource tableSource = tableSourceTable.tableSource();
if (tableSource instanceof ScanTableSource) {
ScanTableSource scanSource = (ScanTableSource) tableSource;
return scanSource.getChangelogMode();
} else {
return ChangelogMode.insertOnly();
}
}

// Default to insert-only
return ChangelogMode.insertOnly();
}

@Override
public RelNode visit(RelNode node) {
if (node instanceof FlinkLogicalValues || node instanceof TableScan) {
final ChangelogMode changelogMode = extractChangelogModeFromSource(node);
this.srcChangelogModeRowKinds.addAll(changelogMode.getContainedKinds());
return node;
} else if (node instanceof FlinkLogicalIntersect
|| node instanceof FlinkLogicalUnion
@@ -288,6 +314,15 @@ private RelNode visitJoin(FlinkLogicalJoin join) {
// materialize time attribute fields of regular join's inputs
newLeft = materializeTimeIndicators(newLeft);
newRight = materializeTimeIndicators(newRight);
} else if (IntervalJoinUtil.satisfyIntervalJoin(join, newLeft, newRight)) {
// if source contains updates, we need to materialize time attributes
boolean containsInsertOnly =
srcChangelogModeRowKinds.size() == 1
&& srcChangelogModeRowKinds.contains(RowKind.INSERT);
if (!containsInsertOnly) {
newLeft = materializeTimeIndicators(newLeft);
newRight = materializeTimeIndicators(newRight);
}
}
List<RelDataTypeField> leftRightFields = new ArrayList<>();
leftRightFields.addAll(newLeft.getRowType().getFieldList());
Original file line number Diff line number Diff line change
@@ -247,9 +247,9 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val children = visitChildren(cep, ModifyKindSetTrait.INSERT_ONLY, "Match Recognize")
createNewNode(cep, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)

case _: StreamPhysicalTemporalSort | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalOverAggregate | _: StreamPhysicalPythonOverAggregate =>
// TemporalSort, OverAggregate, IntervalJoin only support consuming insert-only
case _: StreamPhysicalTemporalSort | _: StreamPhysicalOverAggregate |
_: StreamPhysicalPythonOverAggregate =>
// TemporalSort, and OverAggregate only support consuming insert-only
// and producing insert-only changes
val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
createNewNode(rel, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)
@@ -287,6 +287,39 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val leftTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
createNewNode(temporalJoin, children, leftTrait, requiredTrait, requester)

case intervalJoin: StreamPhysicalIntervalJoin =>
// IntervalJoin only support consuming insert-only and producing insert-only changes
// However, when the input contains updates convert to RegularJoin
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
val inputModifyKindSet = ModifyKindSet.union(children.map(getModifyKindSet): _*)
val containsUpdatesOrDeletes = inputModifyKindSet.contains(
ModifyKind.UPDATE) || inputModifyKindSet.contains(ModifyKind.DELETE)
if (containsUpdatesOrDeletes) {
// Convert to regular join if the input contains Updates
val isInnerJoin = intervalJoin.joinSpec.getJoinType == FlinkJoinType.INNER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only inner and not all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as the RegularJoin condition
RegularJoin has conditions for INNER and SEMI. However, IntervalJoin doesn't support SEMI
Ref: https://github.com/apache/flink/pull/26230/files#diff-ddf95eb3949ab889e8e3bccdacb9f57553aac06657574fd6316ca17dd48321a1R262

val providedTrait = if (isInnerJoin) {
// forward left and right modify operations
val leftKindSet = getModifyKindSet(children.head)
val rightKindSet = getModifyKindSet(children.last)
new ModifyKindSetTrait(leftKindSet.union(rightKindSet))
} else {
// otherwise, it may produce any kinds of changes
ModifyKindSetTrait.ALL_CHANGES
}
val regularJoin = new StreamPhysicalJoin(
rel.getCluster,
rel.getTraitSet,
intervalJoin.getLeft,
intervalJoin.getRight,
intervalJoin.getCondition,
intervalJoin.getJoinType,
intervalJoin.getHints)
createNewNode(regularJoin, children, providedTrait, requiredTrait, requester)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be that this approach has one major issue. What is the data type of the timestamp column? Is it still marked as a time attribute or is it a regular column? If it is still marked as a time attribute, windows or subsequent interval joins would still be allowed by the planner. Of course the output would be garbage in this case.

Copy link
Contributor Author

@bvarghese1 bvarghese1 Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a commit to materialize time attributes while processing IntervalJoin if the source has updates.
This makes the conversion in FlinkChangelogModeInferenceProgram redundant.

} else {
// IntervalJoin can only support Inserts
createNewNode(rel, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)
}

case _: StreamPhysicalCalcBase | _: StreamPhysicalCorrelateBase |
_: StreamPhysicalLookupJoin | _: StreamPhysicalExchange | _: StreamPhysicalExpand |
_: StreamPhysicalMiniBatchAssigner | _: StreamPhysicalWatermarkAssigner |
Original file line number Diff line number Diff line change
@@ -35,7 +35,13 @@ public IntervalJoinRestoreTest() {
public List<TableTestProgram> programs() {
return Arrays.asList(
IntervalJoinTestPrograms.INTERVAL_JOIN_EVENT_TIME,
IntervalJoinTestPrograms.INTERVAL_JOIN_EVENT_TIME_UPDATING_SOURCE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a restore test. Restoring doesn't need to be tested.
I added a SemanticTestBase recently to test an execution only:
Take this as an example: #26236

IntervalJoinTestPrograms
.INTERVAL_JOIN_EVENT_TIME_UPDATING_SOURCE_AND_SINK_MATERIALIZE,
IntervalJoinTestPrograms.INTERVAL_JOIN_PROC_TIME,
IntervalJoinTestPrograms.INTERVAL_JOIN_NEGATIVE_INTERVAL);
IntervalJoinTestPrograms.INTERVAL_JOIN_PROC_TIME_UPDATING_SOURCE,
IntervalJoinTestPrograms.INTERVAL_JOIN_NEGATIVE_INTERVAL,
IntervalJoinTestPrograms.INTERVAL_JOIN_NEGATIVE_INTERVAL_UPDATING_SOURCE,
IntervalJoinTestPrograms.INTERVAL_JOIN_NON_TIME_ATTRIBUTE);
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -258,6 +258,78 @@ Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS procti
]]>
</Resource>
</TestCase>
<TestCase name="testIntervalJoin">
<Resource name="sql">
<![CDATA[
SELECT * FROM Orders AS o
JOIN ratesHistory AS r
ON o.currency = r.currency
WHERE o.rowtime BETWEEN r.rowtime - INTERVAL '5' SECOND AND r.rowtime + INTERVAL '5' SECOND
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], proctime=[$3], currency0=[$4], rate=[$5], rowtime0=[$6])
+- LogicalFilter(condition=[AND(>=($2, -($6, 5000:INTERVAL SECOND)), <=($2, +($6, 5000:INTERVAL SECOND)))])
+- LogicalJoin(condition=[=($1, $4)], joinType=[inner])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
: +- LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], proctime=[PROCTIME()])
: +- LogicalTableScan(table=[[default_catalog, default_database, Orders, source: [CollectionTableSource(amount, currency, rowtime)]]])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, ratesHistory, source: [CollectionTableSource(currency, rate, rowtime)]]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, currency0, rate, rowtime0], changelogMode=[I])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=5000, leftTimeIndex=2, rightTimeIndex=2], where=[AND(=(currency, currency0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 5000:INTERVAL SECOND)))], select=[amount, currency, rowtime, proctime, currency0, rate, rowtime0], changelogMode=[I])
:- Exchange(distribution=[hash[currency]], changelogMode=[I])
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], changelogMode=[I])
: +- Calc(select=[amount, currency, rowtime, PROCTIME() AS proctime], changelogMode=[I])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, Orders, source: [CollectionTableSource(amount, currency, rowtime)]]], fields=[amount, currency, rowtime], changelogMode=[I])
+- Exchange(distribution=[hash[currency]], changelogMode=[I])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], changelogMode=[I])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, ratesHistory, source: [CollectionTableSource(currency, rate, rowtime)]]], fields=[currency, rate, rowtime], changelogMode=[I])
]]>
</Resource>
</TestCase>
<TestCase name="testIntervalJoinToRegularJoin">
<Resource name="sql">
<![CDATA[
SELECT * FROM Orders AS o
JOIN ratesChangelogStream AS r
ON o.currency = r.currency
WHERE o.rowtime BETWEEN r.rowtime - INTERVAL '5' SECOND AND r.rowtime + INTERVAL '5' SECOND
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], proctime=[$3], currency0=[$4], rate=[$5], rowtime0=[$6])
+- LogicalFilter(condition=[AND(>=($2, -($6, 5000:INTERVAL SECOND)), <=($2, +($6, 5000:INTERVAL SECOND)))])
+- LogicalJoin(condition=[=($1, $4)], joinType=[inner])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
: +- LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], proctime=[PROCTIME()])
: +- LogicalTableScan(table=[[default_catalog, default_database, Orders, source: [CollectionTableSource(amount, currency, rowtime)]]])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, ratesChangelogStream]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Join(joinType=[InnerJoin], where=[AND(=(currency, currency0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 5000:INTERVAL SECOND)))], select=[amount, currency, rowtime, proctime, currency0, rate, rowtime0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
:- Exchange(distribution=[hash[currency]], changelogMode=[I])
: +- Calc(select=[amount, currency, CAST(rowtime AS TIMESTAMP(3)) AS rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime], changelogMode=[I])
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], changelogMode=[I])
: +- Calc(select=[amount, currency, rowtime, PROCTIME() AS proctime], changelogMode=[I])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, Orders, source: [CollectionTableSource(amount, currency, rowtime)]]], fields=[amount, currency, rowtime], changelogMode=[I])
+- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
+- Calc(select=[currency, rate, CAST(rowtime AS TIMESTAMP(3)) AS rowtime], changelogMode=[I,UA,D])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], changelogMode=[I,UA,D])
+- DropUpdateBefore(changelogMode=[I,UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, ratesChangelogStream]], fields=[currency, rate, rowtime], changelogMode=[I,UB,UA,D])
]]>
</Resource>
</TestCase>
<TestCase name="testTwoLevelGroupByLocalGlobalOn">
<Resource name="sql">
<![CDATA[
Original file line number Diff line number Diff line change
@@ -61,6 +61,44 @@ Calc(select=[a])
]]>
</Resource>
</TestCase>
<TestCase name="testIntervalJoinToRegularJoin">
<Resource name="sql">
<![CDATA[
SELECT t4.a FROM MyTable4 t4
JOIN MyTable5 t5
ON t4.a = t5.a
AND t4.rowtime BETWEEN t5.rowtime - INTERVAL '5' SECOND AND t5.rowtime + INTERVAL '5' SECOND
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0])
+- LogicalJoin(condition=[AND(=($0, $4), >=($3, -($7, 5000:INTERVAL SECOND)), <=($3, +($7, 5000:INTERVAL SECOND)))], joinType=[inner])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
: +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[TO_TIMESTAMP_LTZ($1, 3)])
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable4]])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
+- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[TO_TIMESTAMP_LTZ($1, 3)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable5]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a])
+- Join(joinType=[InnerJoin], where=[((a = a0) AND (rowtime >= (rowtime0 - 5000:INTERVAL SECOND)) AND (rowtime <= (rowtime0 + 5000:INTERVAL SECOND)))], select=[a, rowtime, a0, rowtime0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, CAST(rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS rowtime])
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
: +- Calc(select=[a, TO_TIMESTAMP_LTZ(b, 3) AS rowtime])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable4, project=[a, b], metadata=[]]], fields=[a, b])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, CAST(rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS rowtime])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+- Calc(select=[a, TO_TIMESTAMP_LTZ(b, 3) AS rowtime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable5, project=[a, b], metadata=[]]], fields=[a, b])
]]>
</Resource>
</TestCase>
<TestCase name="testInteravalDiffTimeIndicator">
<Resource name="sql">
<![CDATA[

Large diffs are not rendered by default.

Binary file not shown.

Large diffs are not rendered by default.

Binary file not shown.

Large diffs are not rendered by default.

Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,396 @@
{
"flinkVersion" : "2.1",
"nodes" : [ {
"id" : 45,
"type" : "stream-exec-table-source-scan_1",
"scanTableSource" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`orders_t`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "id",
"dataType" : "INT"
}, {
"name" : "order_ts_str",
"dataType" : "VARCHAR(2147483647)"
}, {
"name" : "order_ts",
"kind" : "COMPUTED",
"expression" : {
"rexNode" : {
"kind" : "CALL",
"internalName" : "$TO_TIMESTAMP$1",
"operands" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 1,
"type" : "VARCHAR(2147483647)"
} ],
"type" : "TIMESTAMP(3)"
},
"serializableString" : "TO_TIMESTAMP(`order_ts_str`)"
}
} ],
"watermarkSpecs" : [ ]
},
"partitionKeys" : [ ]
}
}
},
"outputType" : "ROW<`id` INT, `order_ts_str` VARCHAR(2147483647)>",
"description" : "TableSourceScan(table=[[default_catalog, default_database, orders_t]], fields=[id, order_ts_str])",
"inputProperties" : [ ]
}, {
"id" : 46,
"type" : "stream-exec-calc_1",
"projection" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 0,
"type" : "INT"
}, {
"kind" : "INPUT_REF",
"inputIndex" : 1,
"type" : "VARCHAR(2147483647)"
}, {
"kind" : "CALL",
"internalName" : "$TO_TIMESTAMP$1",
"operands" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 1,
"type" : "VARCHAR(2147483647)"
} ],
"type" : "TIMESTAMP(3)"
} ],
"condition" : null,
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`id` INT, `order_ts_str` VARCHAR(2147483647), `order_ts` TIMESTAMP(3)>",
"description" : "Calc(select=[id, order_ts_str, TO_TIMESTAMP(order_ts_str) AS order_ts])"
}, {
"id" : 47,
"type" : "stream-exec-exchange_1",
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "HASH",
"keys" : [ 0 ]
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`id` INT, `order_ts_str` VARCHAR(2147483647), `order_ts` TIMESTAMP(3)>",
"description" : "Exchange(distribution=[hash[id]])"
}, {
"id" : 48,
"type" : "stream-exec-table-source-scan_1",
"scanTableSource" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`shipments_t`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "id",
"dataType" : "INT"
}, {
"name" : "order_id",
"dataType" : "INT"
}, {
"name" : "shipment_ts_str",
"dataType" : "VARCHAR(2147483647)"
}, {
"name" : "shipment_ts",
"kind" : "COMPUTED",
"expression" : {
"rexNode" : {
"kind" : "CALL",
"internalName" : "$TO_TIMESTAMP$1",
"operands" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 2,
"type" : "VARCHAR(2147483647)"
} ],
"type" : "TIMESTAMP(3)"
},
"serializableString" : "TO_TIMESTAMP(`shipment_ts_str`)"
}
} ],
"watermarkSpecs" : [ ]
},
"partitionKeys" : [ ]
}
},
"abilities" : [ {
"type" : "ProjectPushDown",
"projectedFields" : [ [ 1 ], [ 2 ] ],
"producedType" : "ROW<`order_id` INT, `shipment_ts_str` VARCHAR(2147483647)> NOT NULL"
}, {
"type" : "ReadingMetadata",
"metadataKeys" : [ ],
"producedType" : "ROW<`order_id` INT, `shipment_ts_str` VARCHAR(2147483647)> NOT NULL"
} ]
},
"outputType" : "ROW<`order_id` INT, `shipment_ts_str` VARCHAR(2147483647)>",
"description" : "TableSourceScan(table=[[default_catalog, default_database, shipments_t, project=[order_id, shipment_ts_str], metadata=[]]], fields=[order_id, shipment_ts_str])",
"inputProperties" : [ ]
}, {
"id" : 49,
"type" : "stream-exec-calc_1",
"projection" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 0,
"type" : "INT"
}, {
"kind" : "INPUT_REF",
"inputIndex" : 1,
"type" : "VARCHAR(2147483647)"
}, {
"kind" : "CALL",
"internalName" : "$TO_TIMESTAMP$1",
"operands" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 1,
"type" : "VARCHAR(2147483647)"
} ],
"type" : "TIMESTAMP(3)"
} ],
"condition" : null,
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`order_id` INT, `shipment_ts_str` VARCHAR(2147483647), `shipment_ts` TIMESTAMP(3)>",
"description" : "Calc(select=[order_id, shipment_ts_str, TO_TIMESTAMP(shipment_ts_str) AS shipment_ts])"
}, {
"id" : 50,
"type" : "stream-exec-exchange_1",
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "HASH",
"keys" : [ 0 ]
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`order_id` INT, `shipment_ts_str` VARCHAR(2147483647), `shipment_ts` TIMESTAMP(3)>",
"description" : "Exchange(distribution=[hash[order_id]])"
}, {
"id" : 51,
"type" : "stream-exec-join_1",
"joinSpec" : {
"joinType" : "INNER",
"leftKeys" : [ 0 ],
"rightKeys" : [ 0 ],
"filterNulls" : [ true ],
"nonEquiCondition" : {
"kind" : "CALL",
"syntax" : "BINARY",
"internalName" : "$AND$1",
"operands" : [ {
"kind" : "CALL",
"syntax" : "BINARY",
"internalName" : "$>=$1",
"operands" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 2,
"type" : "TIMESTAMP(3)"
}, {
"kind" : "CALL",
"syntax" : "SPECIAL",
"internalName" : "$-$1",
"operands" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 5,
"type" : "TIMESTAMP(3)"
}, {
"kind" : "LITERAL",
"value" : "5000",
"type" : "INTERVAL SECOND(6) NOT NULL"
} ],
"type" : "TIMESTAMP(3)"
} ],
"type" : "BOOLEAN"
}, {
"kind" : "CALL",
"syntax" : "BINARY",
"internalName" : "$<=$1",
"operands" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 2,
"type" : "TIMESTAMP(3)"
}, {
"kind" : "CALL",
"syntax" : "SPECIAL",
"internalName" : "$+$1",
"operands" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 5,
"type" : "TIMESTAMP(3)"
}, {
"kind" : "LITERAL",
"value" : "5000",
"type" : "INTERVAL SECOND(6) NOT NULL"
} ],
"type" : "TIMESTAMP(3)"
} ],
"type" : "BOOLEAN"
} ],
"type" : "BOOLEAN"
}
},
"state" : [ {
"index" : 0,
"ttl" : "0 ms",
"name" : "leftState"
}, {
"index" : 1,
"ttl" : "0 ms",
"name" : "rightState"
} ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
}, {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`id` INT, `order_ts_str` VARCHAR(2147483647), `order_ts` TIMESTAMP(3), `order_id` INT, `shipment_ts_str` VARCHAR(2147483647), `shipment_ts` TIMESTAMP(3)>",
"description" : "Join(joinType=[InnerJoin], where=[((id = order_id) AND (order_ts >= (shipment_ts - 5000:INTERVAL SECOND)) AND (order_ts <= (shipment_ts + 5000:INTERVAL SECOND)))], select=[id, order_ts_str, order_ts, order_id, shipment_ts_str, shipment_ts], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])"
}, {
"id" : 52,
"type" : "stream-exec-calc_1",
"projection" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 0,
"type" : "INT"
}, {
"kind" : "INPUT_REF",
"inputIndex" : 1,
"type" : "VARCHAR(2147483647)"
}, {
"kind" : "INPUT_REF",
"inputIndex" : 4,
"type" : "VARCHAR(2147483647)"
} ],
"condition" : null,
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`order_id` INT, `order_ts_str` VARCHAR(2147483647), `shipment_ts_str` VARCHAR(2147483647)>",
"description" : "Calc(select=[id AS order_id, order_ts_str, shipment_ts_str])"
}, {
"id" : 53,
"type" : "stream-exec-sink_1",
"configuration" : {
"table.exec.sink.keyed-shuffle" : "AUTO",
"table.exec.sink.not-null-enforcer" : "ERROR",
"table.exec.sink.rowtime-inserter" : "ENABLED",
"table.exec.sink.type-length-enforcer" : "IGNORE",
"table.exec.sink.upsert-materialize" : "AUTO"
},
"dynamicTableSink" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`sink_t`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "order_id",
"dataType" : "INT"
}, {
"name" : "order_ts_str",
"dataType" : "VARCHAR(2147483647)"
}, {
"name" : "shipment_ts_str",
"dataType" : "VARCHAR(2147483647)"
} ],
"watermarkSpecs" : [ ]
},
"partitionKeys" : [ ]
}
}
},
"inputChangelogMode" : [ "INSERT" ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`order_id` INT, `order_ts_str` VARCHAR(2147483647), `shipment_ts_str` VARCHAR(2147483647)>",
"description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[order_id, order_ts_str, shipment_ts_str])"
} ],
"edges" : [ {
"source" : 45,
"target" : 46,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 46,
"target" : 47,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 48,
"target" : 49,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 49,
"target" : 50,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 47,
"target" : 51,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 50,
"target" : 51,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 51,
"target" : 52,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 52,
"target" : 53,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
} ]
}
Binary file not shown.

Large diffs are not rendered by default.

Binary file not shown.
Original file line number Diff line number Diff line change
@@ -186,6 +186,32 @@ class ChangelogModeInferenceTest extends TableTestBase {
util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
}

@Test
def testIntervalJoin(): Unit = {
// Verifies if plan contains an interval join
val sql =
"""
|SELECT * FROM Orders AS o
| JOIN ratesHistory AS r
| ON o.currency = r.currency
| WHERE o.rowtime BETWEEN r.rowtime - INTERVAL '5' SECOND AND r.rowtime + INTERVAL '5' SECOND
""".stripMargin
util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
}

@Test
def testIntervalJoinToRegularJoin(): Unit = {
// Verifies if plan contains a regular join instead of an interval join
val sql =
"""
|SELECT * FROM Orders AS o
| JOIN ratesChangelogStream AS r
| ON o.currency = r.currency
| WHERE o.rowtime BETWEEN r.rowtime - INTERVAL '5' SECOND AND r.rowtime + INTERVAL '5' SECOND
""".stripMargin
util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
}

@Test
def testGroupByWithUnion(): Unit = {
util.addTable("""
Original file line number Diff line number Diff line change
@@ -63,6 +63,19 @@ class IntervalJoinTest extends TableTestBase {
| 'bounded' = 'false'
|)
""".stripMargin)
util.tableEnv.executeSql(s"""
|CREATE TABLE MyTable5 (
| a int,
| b bigint,
| c string,
| rowtime as TO_TIMESTAMP_LTZ(b, 3),
| watermark for rowtime as rowtime
|) WITH (
| 'connector' = 'values',
| 'bounded' = 'false',
| 'changelog-mode'='I,UA,UB'
|)
""".stripMargin)

/** There should exist exactly two time conditions * */
@Test
@@ -506,6 +519,20 @@ class IntervalJoinTest extends TableTestBase {
util.verifyExecPlan(sql)
}

@Test
def testIntervalJoinToRegularJoin(): Unit = {
// the following query would be translated into regular join instead of interval join because
// the right table (t5) contains updates and the time attributes of both sides are materialized.
val sql =
"""
|SELECT t4.a FROM MyTable4 t4
| JOIN MyTable5 t5
| ON t4.a = t5.a
| AND t4.rowtime BETWEEN t5.rowtime - INTERVAL '5' SECOND AND t5.rowtime + INTERVAL '5' SECOND
""".stripMargin
util.verifyExecPlan(sql)
}

private def verifyTimeBoundary(
timeConditionSql: String,
expLeftSize: Long,