Skip to content

Commit

Permalink
add test KafkaTableITCase#testLatestOffsetStrategyResume
Browse files Browse the repository at this point in the history
  • Loading branch information
Tan-JiaLiang committed Sep 27, 2023
1 parent db787b3 commit f0029d0
Showing 1 changed file with 133 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@

package org.apache.flink.streaming.connectors.kafka.table;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.utils.EncodingUtils;
Expand Down Expand Up @@ -944,6 +951,132 @@ public void testPerPartitionWatermarkWithIdleSource() throws Exception {
deleteTestTopic(topic);
}

@Test
public void testLatestOffsetStrategyResume() throws Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "latest_offset_resume_topic_" + format + "_" + UUID.randomUUID();
createTestTopic(topic, 6, 1);

// ---------- Consume stream from Kafka -------------------

String groupId = getStandardProps().getProperty("group.id");
String bootstraps = getBootstrapServers();

final String createTable =
String.format(
"CREATE TABLE kafka (\n"
+ " `partition_id` INT,\n"
+ " `value` INT\n"
+ ") WITH (\n"
+ " 'connector' = 'kafka',\n"
+ " 'topic' = '%s',\n"
+ " 'properties.bootstrap.servers' = '%s',\n"
+ " 'properties.group.id' = '%s',\n"
+ " 'scan.startup.mode' = 'latest-offset',\n"
+ " 'sink.partitioner' = '%s',\n"
+ " 'format' = '%s'\n"
+ ")",
topic, bootstraps, groupId, TestPartitioner.class.getName(), format);

tEnv.executeSql(createTable);

env.setParallelism(1);
String createSink =
"CREATE TABLE MySink(\n"
+ " `id` INT,\n"
+ " `value` INT\n"
+ ") WITH (\n"
+ " 'connector' = 'values'\n"
+ ")";
tEnv.executeSql(createSink);

String executeInsert = "INSERT INTO MySink SELECT `partition_id`, `value` FROM kafka";
TableResult tableResult = tEnv.executeSql(executeInsert);

// ---------- Produce data into Kafka's partition 0-2 -------------------

String initialValues = "INSERT INTO kafka VALUES (0, 0), (1, 0), (2, 0)";
tEnv.executeSql(initialValues).await();

final List<String> expected = Arrays.asList("+I[0, 0]", "+I[1, 0]", "+I[2, 0]");
KafkaTableTestUtils.waitingExpectedResults("MySink", expected, Duration.ofSeconds(5));

// ---------- Stop the consume job with savepoint -------------------

String savepointBasePath = getTempDirPath(topic + "-savepoint");
assert tableResult.getJobClient().isPresent();
JobClient client = tableResult.getJobClient().get();
String savepointPath =
client.stopWithSavepoint(false, savepointBasePath, SavepointFormatType.DEFAULT)
.get();

// ---------- Produce data into Kafka's partition 0-5 -------------------

String produceValuesBeforeResume =
"INSERT INTO kafka VALUES (0, 1), (1, 1), (2, 1), (3, 0), (4, 0), (5, 0)";
tEnv.executeSql(produceValuesBeforeResume).await();

// ---------- Resume the consume job from savepoint -------------------

Configuration configuration = new Configuration();
configuration.set(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());

tEnv.executeSql(createTable);
tEnv.executeSql(createSink);
tableResult = tEnv.executeSql(executeInsert);

final List<String> afterResumeExpected =
Arrays.asList(
"+I[0, 0]",
"+I[1, 0]",
"+I[2, 0]",
"+I[0, 1]",
"+I[1, 1]",
"+I[2, 1]",
"+I[3, 0]",
"+I[4, 0]",
"+I[5, 0]");
KafkaTableTestUtils.waitingExpectedResults(
"MySink", afterResumeExpected, Duration.ofSeconds(5));

// ---------- Produce data into Kafka's partition 0-5 -------------------

String produceValuesAfterResume =
"INSERT INTO kafka VALUES (0, 2), (1, 2), (2, 2), (3, 1), (4, 1), (5, 1)";
this.tEnv.executeSql(produceValuesAfterResume).await();

final List<String> afterProduceExpected =
Arrays.asList(
"+I[0, 0]",
"+I[1, 0]",
"+I[2, 0]",
"+I[0, 1]",
"+I[1, 1]",
"+I[2, 1]",
"+I[3, 0]",
"+I[4, 0]",
"+I[5, 0]",
"+I[0, 2]",
"+I[1, 2]",
"+I[2, 2]",
"+I[3, 1]",
"+I[4, 1]",
"+I[5, 1]");
KafkaTableTestUtils.waitingExpectedResults(
"MySink", afterProduceExpected, Duration.ofSeconds(5));

// ------------- cleanup -------------------

tableResult.getJobClient().ifPresent(JobClient::cancel);
deleteTestTopic(topic);
}

@Test
public void testStartFromGroupOffsetsLatest() throws Exception {
testStartFromGroupOffsets("latest");
Expand Down

0 comments on commit f0029d0

Please sign in to comment.