Skip to content

Commit

Permalink
Add KRaft to ITs (#962)
Browse files Browse the repository at this point in the history
* Add KRaft to ITs

Signed-off-by: see-quick <[email protected]>

* more debug info

Signed-off-by: see-quick <[email protected]>

* better clean-up

Signed-off-by: see-quick <[email protected]>

* disable stc logging

Signed-off-by: see-quick <[email protected]>

* import checkstlye

Signed-off-by: see-quick <[email protected]>

* Update condition to prevent race

Signed-off-by: see-quick <[email protected]>

---------

Signed-off-by: see-quick <[email protected]>
  • Loading branch information
see-quick authored Jan 14, 2025
1 parent 3e983a2 commit 7c3fa61
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 7 deletions.
2 changes: 2 additions & 0 deletions .azure/build-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ stages:
displayName: Java build
jobs:
- template: 'templates/jobs/build_java.yaml'
variables:
STRIMZI_TEST_CONTAINER_LOGGING_ENABLED: false
- stage: docs_build
displayName: Docs build
dependsOn: []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,20 @@ public void deleteTopics(Collection<String> topics) throws InterruptedException,
}

/**
* Method hasKafkaZeroTopics used for the race condition between in-memory kafka cluster and also encapsulate the get
* Wait until Kafka actually reports zero topics, or until we time out.
*/
public boolean hasKafkaZeroTopics() throws InterruptedException, ExecutionException {
Set<String> topicSet = adminClient.listTopics().names().get();
if (!topicSet.isEmpty()) {
LOGGER.error("Kafka should contain 0 topics but contains {}", topicSet.toString());
return false;
final int maxAttempts = 5;
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
Set<String> topicSet = adminClient.listTopics().names().get();
if (topicSet.isEmpty()) {
return true;
}
LOGGER.warn("Topics still present on attempt {}: {}", attempt, topicSet);
Thread.sleep(1000);
}
return true;
LOGGER.error("Kafka did not report zero topics after {} attempts", maxAttempts);
return false;
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class ConsumerGeneratedNameIT {
static {
if ("FALSE".equals(KAFKA_EXTERNAL_ENV)) {
kafkaContainer = new StrimziKafkaContainer()
.withKraft()
.waitForRunning();
kafkaContainer.start();
kafkaUri = kafkaContainer.getBootstrapServers();
Expand Down
1 change: 1 addition & 0 deletions src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class HttpCorsIT {
static {
if ("FALSE".equals(KAFKA_EXTERNAL_ENV)) {
kafkaContainer = new StrimziKafkaContainer()
.withKraft()
.waitForRunning();
kafkaContainer.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public abstract class HttpBridgeITAbstract {
static {
if ("FALSE".equals(KAFKA_EXTERNAL_ENV)) {
kafkaContainer = new StrimziKafkaContainer()
.withKraft()
.waitForRunning();
kafkaContainer.start();

Expand Down Expand Up @@ -161,7 +162,23 @@ void setUpEach() {
void cleanUp() throws InterruptedException, ExecutionException {
Collection<String> topics = adminClientFacade.listTopic();
LOGGER.info("Kafka still contains {}", topics);
adminClientFacade.deleteTopics(topics);

if (!topics.isEmpty()) {
try {
adminClientFacade.deleteTopics(topics);
} catch (ExecutionException executionException) {
if (executionException.getCause() instanceof org.apache.kafka.common.errors.UnknownTopicOrPartitionException) {
LOGGER.warn("Some topics not found (already deleted). Ignoring ...");
} else {
throw executionException;
}
}

Collection<String> remainingTopics = adminClientFacade.listTopic();
if (!remainingTopics.isEmpty()) {
LOGGER.error("Topics still present after cleanup: {}", remainingTopics);
}
}
}

protected String generateRandomConsumerGroupName() {
Expand Down

0 comments on commit 7c3fa61

Please sign in to comment.