Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37460][state] Make state processor API checkpoint ID configurable #26299

Merged
merged 1 commit into from
Mar 20, 2025

Conversation

gaborgsomogyi
Copy link
Contributor

This is backport of #26285.

What is the purpose of the change

At the moment the state processor API is using hardcoded 0 as checkpoint ID.
In this PR I've made it configurable and savepoint modification keeps the original checkpoint ID.

Brief change log

Made state processor API checkpoint ID configurable and savepoint modification keeps the original ID.

Verifying this change

Changed automated tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

pom.xml Outdated
<version>0.17.1</version>
<version>0.18.5</version>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is new!☝️The following error is shown by japicmp:

[ERROR] Failed to execute goal com.github.siom79.japicmp:japicmp-maven-plugin:0.17.1:cmp (default) on project flink-state-processor-api: There is at least one incompatibility: org.apache.flink.state.api.OperatorTransformation.bootstrapWith(org.apache.flink.api.java.DataSet,long):CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.state.api.OperatorTransformation.bootstrapWith(org.apache.flink.streaming.api.datastream.DataStream,long):CLASS_GENERIC_TEMPLATE_CHANGED -> [Help 1]

The problem is caused by siom79/japicmp#368.
The issue behind can be solved the following ways:

  • We increase the jacmp version where the mentioned bug has been resolved
  • We add class to the whitelist

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gaborgsomogyi How about bump this on master branch and backport it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with anything since this is ugly here. Separating the intentions is good in general so doing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it's merged then we're going to have the possibility to remove this part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's no more in this change.

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 14, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@gaborgsomogyi
Copy link
Contributor Author

@gaborgsomogyi
Copy link
Contributor Author

@grzegorz-liter-getindata can you please check your side?

@grzegorz-liter-getindata

@gaborgsomogyi yeah, give me couple of days thou please. Is there some date set for 1.20.2 release we need to met?

@gaborgsomogyi
Copy link
Contributor Author

@grzegorz-liter-getindata any update? Related the 1.20.2 it's not yet planned but I'm pretty sure it's going to happen since 1.20 has LTS support.

Copy link
Contributor

@davidradl davidradl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a new configurable thing - it should have associated documentation to detail how, when and why this would be useful.

@gliter
Copy link

gliter commented Mar 20, 2025

@gaborgsomogyi I finished similar test like before
Setup:
Cluster running from official dist 1.20.1
Job build with official 1.20.1 that generates random numbers and output to Kafka
State migration job build with 1.20-SNAPSHOT built from your branch

  1. Start the job, let it run for a while
  2. Stop with savepoint
  3. Check if KafkaCommittable is present
    OK: KafkaCommittable{producerId=18, epoch=0, transactionalId=kafka-sink-0-18}
  4. Change uid of stateful-function to stateful-function-tmp
  5. Bootstrap stateful-function state using stateful-function-tmp as source and change value by 1000
  6. Validate checkpointId is preserved after migration
    OK: checkpointId is preserved
  7. Start job from savepoint
    OK: job continues from last checkpointId
  8. state was updated and loaded correctly, outputs are now higher by 1000
  9. stop the job with savepoint and check if KafkaCommitable was properly handled - that is, Commitable from checkpoint before migration was cleaned and there is only latest Commitable
    OK: Kafka commitable: KafkaCommittable{producerId=55, epoch=0, transactionalId=kafka-sink-0-54}

Everything seems to work in order. Great thanks for quick resolution 🙌 .

@gaborgsomogyi gaborgsomogyi merged commit e24c477 into apache:release-1.20 Mar 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants