Skip to content

feat:add new prefix #2400

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/automq-perf-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1024M"
fi

exec "$(dirname "$0")/kafka-run-class.sh" -name kafkaClient -loggc org.apache.kafka.tools.automq.PerfCommand "$@"
3 changes: 3 additions & 0 deletions s3stream/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
tasks.withType(JavaCompile) {
options.compilerArgs += ["-Xlint:-this-escape"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,28 @@ private PerfCommand(PerfConfig config) {
private void run() {
LOGGER.info("Starting perf test with config: {}", jsonStringify(config));
TimerUtil timer = new TimerUtil();

if (config.reset) {
LOGGER.info("Deleting all test topics...");
int deleted = topicService.deleteTopics();
LOGGER.info("Deleted all test topics ({} in total), took {} ms", deleted, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

// Modified topic initialization logic
List<Topic> topics;
if (config.catchupTopicPrefix != null && !config.catchupTopicPrefix.isEmpty()) {
LOGGER.info("Listing existing topics with prefix {}...", config.catchupTopicPrefix);
topics = topicService.listTopicsByPrefix(config.catchupTopicPrefix);
LOGGER.info("Found {} existing topics for catchup test", topics.size());

if (topics.isEmpty()) {
throw new RuntimeException("No topics found with prefix: " + config.catchupTopicPrefix);
}
} else {
if (config.reset) {
LOGGER.info("Deleting all test topics...");
int deleted = topicService.deleteTopics();
LOGGER.info("Deleted {} test topics, took {} ms", deleted, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));
}
LOGGER.info("Creating {} new topics...", config.topics);
topics = topicService.createTopics(config.topicsConfig());
LOGGER.info("Created {} topics, took {} ms", topics.size(), timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));
}

LOGGER.info("Creating topics...");
List<Topic> topics = topicService.createTopics(config.topicsConfig());
LOGGER.info("Created {} topics, took {} ms", topics.size(), timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

LOGGER.info("Creating consumers...");
int consumers = consumerService.createConsumers(topics, config.consumersConfig());
consumerService.start(this::messageReceived, config.maxConsumeRecordRate);
Expand All @@ -117,8 +128,15 @@ private void run() {
waitTopicsReady(consumerService.consumerCount() > 0);
LOGGER.info("Topics are ready, took {} ms", timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

// Modified producer start logic
Function<String, List<byte[]>> payloads = payloads(config, topics);
producerService.start(payloads, config.sendRate);
if (config.catchupTopicPrefix != null) {
LOGGER.info("Starting catchup test with existing topics");
producerService.start(payloads, config.sendRate);
} else {
LOGGER.info("Starting normal test with new topics");
producerService.start(payloads, config.sendRate);
}

preparing = false;

Expand All @@ -132,7 +150,7 @@ private void run() {
}

Result result;
if (config.backlogDurationSeconds > 0) {
if (config.catchupTopicPrefix == null && config.backlogDurationSeconds > 0) {
LOGGER.info("Pausing consumers for {} seconds to build up backlog...", config.backlogDurationSeconds);
consumerService.pause();
long backlogStart = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class PerfConfig {
public final int reportingIntervalSeconds;
public final String valueSchema;
public final String valuesFile;
public final String catchupTopicPrefix;

public PerfConfig(String[] args) {
ArgumentParser parser = parser();
Expand Down Expand Up @@ -105,7 +106,16 @@ public PerfConfig(String[] args) {
reportingIntervalSeconds = ns.getInt("reportingIntervalSeconds");
valueSchema = ns.getString("valueSchema");
valuesFile = ns.get("valuesFile");
catchupTopicPrefix = ns.getString("catchupTopicPrefix");

if (catchupTopicPrefix != null && !catchupTopicPrefix.isEmpty()) {
if (reset) {
throw new IllegalArgumentException(
"Cannot use --reset with --catchup-topic-prefix"
);
}
}

if (backlogDurationSeconds < groupsPerTopic * groupStartDelaySeconds) {
throw new IllegalArgumentException(String.format("BACKLOG_DURATION_SECONDS(%d) should not be less than GROUPS_PER_TOPIC(%d) * GROUP_START_DELAY_SECONDS(%d)",
backlogDurationSeconds, groupsPerTopic, groupStartDelaySeconds));
Expand Down Expand Up @@ -260,6 +270,12 @@ public static ArgumentParser parser() {
.dest("valuesFile")
.metavar("VALUES_FILE")
.help("The avro value file. Example file content {\"f1\": \"value1\"}");
parser.addArgument("--catchup-topic-prefix")
.type(String.class)
.dest("catchupTopicPrefix")
.metavar("CATCHUP_TOPIC_PREFIX")
.help("Prefix of existing topics to reuse for catchup testing");

return parser;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.kafka.clients.admin.TopicDescription;

public class TopicService implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(TopicService.class);
Expand Down Expand Up @@ -78,7 +80,30 @@ public List<Topic> createTopics(TopicsConfig config) {
.map(name -> new Topic(name, config.partitionsPerTopic))
.collect(Collectors.toList());
}

/**
* List all performance test topics.
*/
public List<Topic> listTopicsByPrefix(String prefix) {
try {
// Automatically add the unified prefix for performance test topics
String fullPrefix = COMMON_TOPIC_PREFIX + prefix;

List<String> topicNames = admin.listTopics().names().get()
.stream()
// Fix filter condition: use full prefix matching
.filter(name -> name.startsWith(fullPrefix))
.collect(Collectors.toList());

// Fix deprecated all() method invocation
Map<String, TopicDescription> descriptions = admin.describeTopics(topicNames).allTopicNames().get();

return descriptions.values().stream()
.map(desc -> new Topic(desc.name(), desc.partitions().size()))
.collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Failed to list topics with prefix: " + prefix, e);
}
}
/**
* Delete all historical performance test topics.
*/
Expand Down
Loading