Skip to content

Commit

Permalink
[FLINK-30935][connectors/kafka] Add Kafka serializers deserialize che…
Browse files Browse the repository at this point in the history
…ck when using SimpleVersionedSerializer
  • Loading branch information
chucheng92 committed Mar 22, 2023
1 parent 58f3537 commit 0c541a6
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ public byte[] serialize(KafkaCommittable state) throws IOException {

@Override
public KafkaCommittable deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
return deserializeV1(serialized);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}

private KafkaCommittable deserializeV1(byte[] serialized) throws IOException {
try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
final DataInputStream in = new DataInputStream(bais)) {
final short epoch = in.readShort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ public byte[] serialize(KafkaWriterState state) throws IOException {

@Override
public KafkaWriterState deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
return deserializeV1(serialized);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}

private KafkaWriterState deserializeV1(byte[] serialized) throws IOException {
try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
final DataInputStream in = new DataInputStream(bais)) {
final String transactionalIdPrefx = in.readUTF();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ public byte[] serialize(KafkaPartitionSplit split) throws IOException {

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 0:
return deserializeV0(serialized);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}

private KafkaPartitionSplit deserializeV0(byte[] serialized) throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

import java.io.IOException;

import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* Tests for serializing and deserialzing {@link KafkaCommittable} with {@link
Expand All @@ -41,4 +43,14 @@ public void testCommittableSerDe() throws IOException {
final byte[] serialized = SERIALIZER.serialize(committable);
assertThat(SERIALIZER.deserialize(1, serialized)).isEqualTo(committable);
}

@Test
public void testCommittableSerDeWithUnsupportedVersion() throws IOException {
final String transactionalId = "test-id";
final short epoch = 5;
final KafkaCommittable committable = new KafkaCommittable(1L, epoch, transactionalId, null);
final byte[] serialized = SERIALIZER.serialize(committable);
assertThatThrownBy(() -> SERIALIZER.deserialize(0, serialized))
.satisfies(anyCauseMatches("Unrecognized version or corrupt state"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

import java.io.IOException;

import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* Tests for serializing and deserialzing {@link KafkaWriterState} with {@link
Expand All @@ -39,4 +41,12 @@ public void testStateSerDe() throws IOException {
final byte[] serialized = SERIALIZER.serialize(state);
assertThat(SERIALIZER.deserialize(1, serialized)).isEqualTo(state);
}

@Test
public void testStateSerDeWithUnsupportedVersion() throws IOException {
final KafkaWriterState state = new KafkaWriterState("idPrefix");
final byte[] serialized = SERIALIZER.serialize(state);
assertThatThrownBy(() -> SERIALIZER.deserialize(0, serialized))
.satisfies(anyCauseMatches("Unrecognized version or corrupt state"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.io.IOException;
import java.util.List;

import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link KafkaPartitionSplitSerializer}. */
public class KafkaPartitionSplitSerializerTest {
Expand All @@ -51,4 +53,26 @@ public void testSerializer() throws IOException {
assertThat(deserializeSplit).isEqualTo(kafkaPartitionSplit);
}
}

@Test
public void testSerializerWithUnsupportedVersion() throws IOException {
String topic = "topic";
Long offsetZero = 0L;
Long normalOffset = 1L;
TopicPartition topicPartition = new TopicPartition(topic, 1);
List<Long> stoppingOffsets =
Lists.newArrayList(
KafkaPartitionSplit.COMMITTED_OFFSET,
KafkaPartitionSplit.LATEST_OFFSET,
offsetZero,
normalOffset);
KafkaPartitionSplitSerializer splitSerializer = new KafkaPartitionSplitSerializer();
for (Long stoppingOffset : stoppingOffsets) {
KafkaPartitionSplit kafkaPartitionSplit =
new KafkaPartitionSplit(topicPartition, 0, stoppingOffset);
byte[] serialize = splitSerializer.serialize(kafkaPartitionSplit);
assertThatThrownBy(() -> splitSerializer.deserialize(1, serialize))
.satisfies(anyCauseMatches("Unrecognized version or corrupt state"));
}
}
}

0 comments on commit 0c541a6

Please sign in to comment.