Skip to content

Saksham verma08/catchup read solution #2496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 58 additions & 64 deletions tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import java.util.stream.Collectors;

import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
Expand All @@ -69,6 +71,10 @@ public class PerfCommand implements AutoCloseable {

private static final ObjectWriter JSON = new ObjectMapper().writerWithDefaultPrettyPrinter();
private static final Logger LOGGER = LoggerFactory.getLogger(PerfCommand.class);

private static final String CATCHUP_TOPIC_PREFIX = "catchup-topic-prefix";



private final PerfConfig config;
private final TopicService topicService;
Expand Down Expand Up @@ -98,73 +104,61 @@ private PerfCommand(PerfConfig config) {
this.consumerService = new ConsumerService(config.bootstrapServer(), config.adminConfig());
}

private void run() {
LOGGER.info("Starting perf test with config: {}", jsonStringify(config));
TimerUtil timer = new TimerUtil();

if (config.reset) {
LOGGER.info("Deleting all test topics...");
int deleted = topicService.deleteTopics();
LOGGER.info("Deleted all test topics ({} in total), took {} ms", deleted, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));
}

LOGGER.info("Creating topics...");
List<Topic> topics = topicService.createTopics(config.topicsConfig());
LOGGER.info("Created {} topics, took {} ms", topics.size(), timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

LOGGER.info("Creating consumers...");
int consumers = consumerService.createConsumers(topics, config.consumersConfig());
consumerService.start(this::messageReceived, config.maxConsumeRecordRate);
LOGGER.info("Created {} consumers, took {} ms", consumers, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

LOGGER.info("Creating producers...");
int producers = producerService.createProducers(topics, config.producersConfig(), this::messageSent);
LOGGER.info("Created {} producers, took {} ms", producers, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

if (config.awaitTopicReady) {
LOGGER.info("Waiting for topics to be ready...");
waitTopicsReady(consumerService.consumerCount() > 0);
LOGGER.info("Topics are ready, took {} ms", timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));
}

Function<String, List<byte[]>> payloads = payloads(config, topics);
producerService.start(payloads, config.sendRate);
private void run(){

preparing = false;

if (config.warmupDurationMinutes > 0) {
LOGGER.info("Warming up for {} minutes...", config.warmupDurationMinutes);
long warmupStart = System.nanoTime();
long warmupMiddle = warmupStart + TimeUnit.MINUTES.toNanos(config.warmupDurationMinutes) / 2;
producerService.adjustRate(warmupStart, ProducerService.MIN_RATE);
producerService.adjustRate(warmupMiddle, config.sendRate);
collectStats(Duration.ofMinutes(config.warmupDurationMinutes));
}

Result result;
if (config.backlogDurationSeconds > 0) {
LOGGER.info("Pausing consumers for {} seconds to build up backlog...", config.backlogDurationSeconds);
consumerService.pause();
long backlogStart = System.currentTimeMillis();
collectStats(Duration.ofSeconds(config.backlogDurationSeconds));
long backlogEnd = System.nanoTime();

LOGGER.info("Resetting consumer offsets and resuming...");
consumerService.resetOffset(backlogStart, TimeUnit.SECONDS.toMillis(config.groupStartDelaySeconds));
consumerService.resume();

stats.reset();
producerService.adjustRate(config.sendRateDuringCatchup);
result = collectStats(backlogEnd);
} else {
LOGGER.info("Running test for {} minutes...", config.testDurationMinutes);
stats.reset();
result = collectStats(Duration.ofMinutes(config.testDurationMinutes));
}
LOGGER.info("Saving results to {}", saveResult(result));

+ List<Topic> topics;
boolean isCatchup = !Strings.isNullOrEmpty(config.catchupTopicPrefix());
if (isCatchup) {
LOGGER.info("Catch‑up mode enabled for prefix '{}'", config.catchupTopicPrefix());
}

+ if (config.reuseTopics()) {
+
+ String prefix = config.topicPrefix;
+ LOGGER.info("Reuse-topics enabled; listing existing topics with prefix '{}'", prefix);
+ Set<String> existing = topicService.listTestTopics(prefix);
+
+ List<String> desiredNames = config.topicsConfig().topicNames();
+
+ List<String> toCreate = desiredNames.stream()
+ .filter(name -> !existing.contains(name))
+ .collect(Collectors.toList());
+
+ if (!toCreate.isEmpty()) {
+ LOGGER.info("Creating {} missing topics...", toCreate.size());
+
+ TopicsConfig partial = new TopicsConfig(
+ prefix,
+ toCreate.size(),
+ config.partitionsPerTopic,
+ config.topicConfigs
+ );
+ topicService.createTopics(partial);
+ } else {
+ LOGGER.info("All {} topics already exist; skipping creation.", desiredNames.size());
+ }
+
+
+ LOGGER.info("Describing all {} topics to pass to producers/consumers", desiredNames.size());
+ topics = topicService.describeTopics(desiredNames);
+ LOGGER.info("Reused {} topics in total", topics.size());
+ } else {
+ // old behavior: optional reset, then create everything fresh
+ if (config.reset) {
+ LOGGER.info("Deleting all test topics...");
+ int deleted = topicService.deleteTopics();
+ LOGGER.info("Deleted {} topics in {} ms",
+ deleted, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));
+ }
+ LOGGER.info("Creating topics...");
+ topics = topicService.createTopics(config.topicsConfig());
+ LOGGER.info("Created {} topics, took {} ms", topics.size(), timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));
+ }

running = false;
}
}

private String jsonStringify(Object obj) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ public ConsumerService(String bootstrapServer, Properties properties) {
this.groupSuffix = new SimpleDateFormat("HHmmss").format(System.currentTimeMillis());
}

public void resetToBeginning() {
for (Consumer<?, ?> consumer : consumers) {
try {
consumer.seekToBeginning(consumer.assignment());
LOGGER.info("Reset consumer to beginning: {}", consumer);
} catch (Exception e) {
LOGGER.error("Failed to reset consumer to beginning", e);
}
}
}

/**
* Create consumers for the given topics.
* Note: the created consumers will start polling immediately.
Expand Down Expand Up @@ -348,6 +359,11 @@ private void pollRecords(KafkaConsumer<String, byte[]> consumer, ConsumerCallbac
}
}

boolean isCatchup = !Strings.isNullOrEmpty(config.catchupTopicPrefix());
if (isCatchup) {
LOGGER.info("Catch‑up mode enabled for prefix '{}'", config.catchupTopicPrefix());
}

/**
* Signal the consumer to close.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public class PerfConfig {
public final String valueSchema;
public final String valuesFile;

public final String catchupTopicPrefix;
public final boolean reuseTopics;



public PerfConfig(String[] args) {
ArgumentParser parser = parser();

Expand Down Expand Up @@ -116,10 +121,25 @@ public PerfConfig(String[] args) {
valueSchema = ns.getString("valueSchema");
valuesFile = ns.get("valuesFile");

reuseTopics = ns.getBoolean("reuseTopics");
catchupTopicPrefix=ns.getString("catchupTopicPrefix");

if (backlogDurationSeconds < groupsPerTopic * groupStartDelaySeconds) {
throw new IllegalArgumentException(String.format("BACKLOG_DURATION_SECONDS(%d) should not be less than GROUPS_PER_TOPIC(%d) * GROUP_START_DELAY_SECONDS(%d)",
backlogDurationSeconds, groupsPerTopic, groupStartDelaySeconds));
}

options.addOption(Option.builder()
.longOpt(PerfCommand.CATCHUP_TOPIC_PREFIX)
.hasArg()
.desc("Topic prefix to reuse for catch-up read testing")
.build());

CommandLine cmd = parser.parse(options, args);


this.catchupTopicPrefix = cmd.hasOption(PerfCommand.CATCHUP_TOPIC_PREFIX) ?
cmd.getOptionValue(PerfCommand.CATCHUP_TOPIC_PREFIX) : null;
}

public static ArgumentParser parser() {
Expand Down Expand Up @@ -276,6 +296,17 @@ public static ArgumentParser parser() {
.dest("valuesFile")
.metavar("VALUES_FILE")
.help("The avro value file. Example file content {\"f1\": \"value1\"}");

parser.addArgument("--reuse-topics")
.action(storeTrue())
.dest("reuseTopics")
.help("If set, do not delete existing topics ; only create the missing ones.")

parser.addArgument("--catchup-topic-prefix")
.type(String.class)
.dest("catchupTopicPrefix")
.meatavar("PREFIX")
.help("When set, reuse the existing topics with this prefix for catchup-read test")
return parser;
}

Expand All @@ -289,6 +320,10 @@ public Properties adminConfig() {
return properties;
}

public boolean reuseTopics(){
return reuseTopics;
}

public TopicsConfig topicsConfig() {
return new TopicsConfig(
topicPrefix,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;


import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import com.google.common.collect.Sets;
import java.util.Set;
import java.util.stream.Collectors;

public class TopicService implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(TopicService.class);
Expand All @@ -53,7 +60,7 @@ public class TopicService implements AutoCloseable {
/**
* The common prefix for performance test topics.
*/
private static final String COMMON_TOPIC_PREFIX = "__automq_perf_";
static final String COMMON_TOPIC_PREFIX = "__automq_perf_";

private final Admin admin;

Expand Down Expand Up @@ -130,6 +137,47 @@ private String generateTopicName(String topicPrefix, int partitions, int index)
return String.format("%s%s_%04d_%07d", COMMON_TOPIC_PREFIX, topicPrefix, partitions, index);
}

public Set<String> listTestTopics(String topicPrefix) {
String fullPrefix = COMMON_TOPIC_PREFIX + topicPrefix;
try {
return admin
.listTopics()
.names()
.get()
.stream()
.filter(name -> name.startsWith(fullPrefix))
.collect(Collectors.toSet());
} catch (Exception e) {
LOGGER.error("Failed to list topics", e);
return Sets.newHashSet();
}
}


public List<Topic> describeTopics(Collection<String> names) {
if (names.isEmpty()) {
return List.of();
}
try {
DescribeTopicsResult desc = admin.describeTopics(names);
return desc
.all()
.get() // Map<String,TopicDescription>
.entrySet()
.stream()
.map(e -> new Topic(e.getKey(),
e.getValue().partitions().size()))
.collect(Collectors.toList());
} catch (Exception e) {
LOGGER.error("Failed to describe topics {}", names, e);
// As a fallback, return Topic objects with zero partitions
return names.stream()
.map(n -> new Topic(n, 0))
.collect(Collectors.toList());
}
}


public static class TopicsConfig {
final String topicPrefix;
final int topics;
Expand Down
Loading