Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37403] Convert interval join to regular join for updating source #26230

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

bvarghese1
Copy link
Contributor

What is the purpose of the change

  • Previously, if inputs to interval join contains updates the planner throws an exception.
  • This commit introduces an enhancement to convert interval join to regular join if any of the input to interval join contains updates

Brief change log

  • Check the input modify kind set for interval join node to see if it contains updates
  • If updates are present convert interval join to regular join node

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests for ChangelogModeInference
  • Added restore tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

- Previously, if inputs to interval join contains updates the planner throws
  an exception.
- This commit introduces an enhancement to convert interval join to regular join
  if any of the input to interval join contains updates
@flinkbot
Copy link
Collaborator

flinkbot commented Feb 28, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

ModifyKind.UPDATE) || inputModifyKindSet.contains(ModifyKind.DELETE)
if (containsUpdatesOrDeletes) {
// Convert to regular join if the input contains Updates
val isInnerJoin = intervalJoin.joinSpec.getJoinType == FlinkJoinType.INNER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only inner and not all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as the RegularJoin condition
RegularJoin has conditions for INNER and SEMI. However, IntervalJoin doesn't support SEMI
Ref: https://github.com/apache/flink/pull/26230/files#diff-ddf95eb3949ab889e8e3bccdacb9f57553aac06657574fd6316ca17dd48321a1R262

@@ -35,7 +35,13 @@ public IntervalJoinRestoreTest() {
public List<TableTestProgram> programs() {
return Arrays.asList(
IntervalJoinTestPrograms.INTERVAL_JOIN_EVENT_TIME,
IntervalJoinTestPrograms.INTERVAL_JOIN_EVENT_TIME_UPDATING_SOURCE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a restore test. Restoring doesn't need to be tested.
I added a SemanticTestBase recently to test an execution only:
Take this as an example: #26236

intervalJoin.getCondition,
intervalJoin.getJoinType,
intervalJoin.getHints)
createNewNode(regularJoin, children, providedTrait, requiredTrait, requester)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be that this approach has one major issue. What is the data type of the timestamp column? Is it still marked as a time attribute or is it a regular column? If it is still marked as a time attribute, windows or subsequent interval joins would still be allowed by the planner. Of course the output would be garbage in this case.

Copy link
Contributor Author

@bvarghese1 bvarghese1 Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a commit to materialize time attributes while processing IntervalJoin if the source has updates.
This makes the conversion in FlinkChangelogModeInferenceProgram redundant.

- Materialize time attributes in IntervalJoin if any of the source contains
  updates
- This will convert IntervalJoin to RegularJoin
Copy link
Contributor

@davidradl davidradl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a comment on the Jira asking a question; could you address that question please, in case it proves to be a better approach? The question is can we incorporate updates by amending the interval join processing? Changing the interval join to a regular join, seems strange as we are not honouring the requested join type - so could be confusing to the user, would a meaningful error message also be an option. WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants