-
Notifications
You must be signed in to change notification settings - Fork 123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-30935][connector/kafka] Add kafka serializers version check when using SimpleVersionedSerializer #10
base: main
Are you sure you want to change the base?
Conversation
@leonardBang @PatrickRen sorry to bother you, |
@tzulitai hi, Gordon, can you help to take a look please? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @chucheng92. The changes make sense, but I left a comment to make sure we do our due diligence in documenting historical schema versions. This would allow us to more confidently merge this change.
@@ -46,6 +46,15 @@ public byte[] serialize(KafkaCommittable state) throws IOException { | |||
|
|||
@Override | |||
public KafkaCommittable deserialize(int version, byte[] serialized) throws IOException { | |||
switch (version) { | |||
case 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we know if there wasn't historical versions where the version number is 0
or some other number?
Is there existing migration IT tests that verify, with this change, we can still safely restore from previous versions?
In general, it looks like we're missing some documentation where we historically track what versions have existed before and what their corresponding schema is. I think having a class-level Javadoc to document this is already enough.
@chucheng92 do you think you'd be able to do this as part of this PR contribution? Basically, look at historical changes to confirm that this was indeed the only used version number + document its schema in the class-level Javadoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tzulitai hi, Gordon. thanks for your reviewing! the version from 1 can be found at https://github.com/apache/flink/pull/16676/files#diff-decc6e90291b6f7c78efae7bd3223d218a1aba00bbbf53e4c696d63506e907cf. however, we can't always check PR to see if there is an old version or a version change. So I agree with you to add JavaDocs to record serialized version evolutions. I added it in the pr. WDYT?
@tzulitai hi, Gordon. It's been a long time, please let me know if you have any concerns, I will fix it ASAP. thanks. |
@chucheng92 Can you please rebase your PR? |
…ck when using SimpleVersionedSerializer
0e2ac10
to
3a118a3
Compare
Yes, I have rebased it and passed ci. If you have time, pls help to review it again. thanks a lot. |
@mas-chen Do you want to do a review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chucheng92 is this truly a problem? There is only one version of the Kafka state. We only make version changes if there is backward incompatible change to the state serializers
What is the purpose of the change
Add deserialize version check for kafka simple versioned serializers like other SimpleVersionedSerializer implementations in case of incompatible or corrupt state when restoring from checkpoint.
Brief change log
Add deserialize version check logic for kafka simple versioned serializers.
Verifying this change
Add cases in KafkaCommittableSerializerTest and KafkaWriterStateSerializerTest and KafkaPartitionSplitSerializerTest.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation