From 7c3fa61acfa07182e62b3107a60fb594141d395e Mon Sep 17 00:00:00 2001 From: Maros Orsak Date: Tue, 14 Jan 2025 12:04:12 +0100 Subject: [PATCH] Add KRaft to ITs (#962) * Add KRaft to ITs Signed-off-by: see-quick * more debug info Signed-off-by: see-quick * better clean-up Signed-off-by: see-quick * disable stc logging Signed-off-by: see-quick * import checkstlye Signed-off-by: see-quick * Update condition to prevent race Signed-off-by: see-quick --------- Signed-off-by: see-quick --- .azure/build-pipeline.yaml | 2 ++ .../bridge/facades/AdminClientFacade.java | 17 +++++++++++------ .../bridge/http/ConsumerGeneratedNameIT.java | 1 + .../strimzi/kafka/bridge/http/HttpCorsIT.java | 1 + .../http/base/HttpBridgeITAbstract.java | 19 ++++++++++++++++++- 5 files changed, 33 insertions(+), 7 deletions(-) diff --git a/.azure/build-pipeline.yaml b/.azure/build-pipeline.yaml index 18598f5b3..bcb37ad1a 100644 --- a/.azure/build-pipeline.yaml +++ b/.azure/build-pipeline.yaml @@ -16,6 +16,8 @@ stages: displayName: Java build jobs: - template: 'templates/jobs/build_java.yaml' + variables: + STRIMZI_TEST_CONTAINER_LOGGING_ENABLED: false - stage: docs_build displayName: Docs build dependsOn: [] diff --git a/src/test/java/io/strimzi/kafka/bridge/facades/AdminClientFacade.java b/src/test/java/io/strimzi/kafka/bridge/facades/AdminClientFacade.java index 3d1fd16b3..b59d6f584 100644 --- a/src/test/java/io/strimzi/kafka/bridge/facades/AdminClientFacade.java +++ b/src/test/java/io/strimzi/kafka/bridge/facades/AdminClientFacade.java @@ -95,15 +95,20 @@ public void deleteTopics(Collection topics) throws InterruptedException, } /** - * Method hasKafkaZeroTopics used for the race condition between in-memory kafka cluster and also encapsulate the get + * Wait until Kafka actually reports zero topics, or until we time out. */ public boolean hasKafkaZeroTopics() throws InterruptedException, ExecutionException { - Set topicSet = adminClient.listTopics().names().get(); - if (!topicSet.isEmpty()) { - LOGGER.error("Kafka should contain 0 topics but contains {}", topicSet.toString()); - return false; + final int maxAttempts = 5; + for (int attempt = 1; attempt <= maxAttempts; attempt++) { + Set topicSet = adminClient.listTopics().names().get(); + if (topicSet.isEmpty()) { + return true; + } + LOGGER.warn("Topics still present on attempt {}: {}", attempt, topicSet); + Thread.sleep(1000); } - return true; + LOGGER.error("Kafka did not report zero topics after {} attempts", maxAttempts); + return false; } public void close() { diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java index cd48f7fdf..140d43ba1 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java @@ -59,6 +59,7 @@ public class ConsumerGeneratedNameIT { static { if ("FALSE".equals(KAFKA_EXTERNAL_ENV)) { kafkaContainer = new StrimziKafkaContainer() + .withKraft() .waitForRunning(); kafkaContainer.start(); kafkaUri = kafkaContainer.getBootstrapServers(); diff --git a/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java b/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java index b6c3f6be9..511b2861f 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java @@ -65,6 +65,7 @@ public class HttpCorsIT { static { if ("FALSE".equals(KAFKA_EXTERNAL_ENV)) { kafkaContainer = new StrimziKafkaContainer() + .withKraft() .waitForRunning(); kafkaContainer.start(); diff --git a/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java b/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java index 8926f01f0..b8a80ec44 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java @@ -72,6 +72,7 @@ public abstract class HttpBridgeITAbstract { static { if ("FALSE".equals(KAFKA_EXTERNAL_ENV)) { kafkaContainer = new StrimziKafkaContainer() + .withKraft() .waitForRunning(); kafkaContainer.start(); @@ -161,7 +162,23 @@ void setUpEach() { void cleanUp() throws InterruptedException, ExecutionException { Collection topics = adminClientFacade.listTopic(); LOGGER.info("Kafka still contains {}", topics); - adminClientFacade.deleteTopics(topics); + + if (!topics.isEmpty()) { + try { + adminClientFacade.deleteTopics(topics); + } catch (ExecutionException executionException) { + if (executionException.getCause() instanceof org.apache.kafka.common.errors.UnknownTopicOrPartitionException) { + LOGGER.warn("Some topics not found (already deleted). Ignoring ..."); + } else { + throw executionException; + } + } + + Collection remainingTopics = adminClientFacade.listTopic(); + if (!remainingTopics.isEmpty()) { + LOGGER.error("Topics still present after cleanup: {}", remainingTopics); + } + } } protected String generateRandomConsumerGroupName() {