Skip to content

Commit

Permalink
Rewrite TestAvroConsumerConfluent with new Kafka source
Browse files Browse the repository at this point in the history
  • Loading branch information
PatrickRen committed Sep 4, 2024
1 parent 3f39a7b commit 73c2010
Showing 1 changed file with 20 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@

package org.apache.flink.schema.registry.test;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import example.avro.User;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

/**
* A simple example that shows how to read from and write to Kafka with Confluent Schema Registry.
Expand All @@ -57,42 +57,43 @@ public static void main(String[] args) throws Exception {
+ "--schema-registry-url <confluent schema registry> --group.id <some id>");
return;
}
Properties config = new Properties();
config.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers"));
config.setProperty("group.id", parameterTool.getRequired("group.id"));
String schemaRegistryUrl = parameterTool.getRequired("schema-registry-url");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<User> input =
env.addSource(
new FlinkKafkaConsumer<>(
parameterTool.getRequired("input-topic"),
String bootstrapServers = parameterTool.getRequired("bootstrap.servers");
KafkaSource<User> kafkaSource =
KafkaSource.<User>builder()
.setBootstrapServers(bootstrapServers)
.setGroupId(parameterTool.getRequired("group.id"))
.setTopics(parameterTool.getRequired("input-topic"))
.setDeserializer(
KafkaRecordDeserializationSchema.valueOnly(
ConfluentRegistryAvroDeserializationSchema.forSpecific(
User.class, schemaRegistryUrl),
config)
.setStartFromEarliest());
User.class, schemaRegistryUrl)))
.setStartingOffsets(OffsetsInitializer.earliest())
.build();

DataStreamSource<User> input =
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

SingleOutputStreamOperator<String> mapToString =
input.map((MapFunction<User, String>) SpecificRecordBase::toString);

KafkaSink<String> stringSink =
KafkaSink.<String>builder()
.setBootstrapServers(
config.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.setBootstrapServers(bootstrapServers)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(new SimpleStringSchema())
.setTopic(parameterTool.getRequired("output-string-topic"))
.build())
.setKafkaProducerConfig(config)
.build();
mapToString.sinkTo(stringSink);

KafkaSink<User> avroSink =
KafkaSink.<User>builder()
.setBootstrapServers(
config.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.setBootstrapServers(bootstrapServers)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(
Expand Down

0 comments on commit 73c2010

Please sign in to comment.