Skip to content

Commit e69d126

Browse files
JonCookJonathan Cook
and
Jonathan Cook
authored
BAEL-4609 - Testing Kafka and Spring Boot (eugenp#10249)
* BAEL-4437 - System Rules * BAEL-4687 Testing Kafka and Spring Boot * BAEL-4609 - Testing Kafka and Spring Boot Co-authored-by: Jonathan Cook <[email protected]>
1 parent 8f425e5 commit e69d126

File tree

10 files changed

+286
-41
lines changed

10 files changed

+286
-41
lines changed

spring-kafka/pom.xml

+18-7
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
45
<modelVersion>4.0.0</modelVersion>
56
<artifactId>spring-kafka</artifactId>
6-
<version>0.0.1-SNAPSHOT</version>
77
<name>spring-kafka</name>
88
<description>Intro to Kafka with Spring</description>
99

@@ -15,25 +15,36 @@
1515
</parent>
1616

1717
<dependencies>
18-
1918
<dependency>
2019
<groupId>org.springframework.boot</groupId>
2120
<artifactId>spring-boot-starter</artifactId>
2221
</dependency>
23-
2422
<dependency>
2523
<groupId>org.springframework.kafka</groupId>
2624
<artifactId>spring-kafka</artifactId>
25+
<version>${spring-kafka.version}</version>
2726
</dependency>
28-
2927
<dependency>
3028
<groupId>com.fasterxml.jackson.core</groupId>
3129
<artifactId>jackson-databind</artifactId>
3230
</dependency>
31+
<dependency>
32+
<groupId>org.springframework.kafka</groupId>
33+
<artifactId>spring-kafka-test</artifactId>
34+
<version>${spring-kafka.version}</version>
35+
<scope>test</scope>
36+
</dependency>
37+
<dependency>
38+
<groupId>org.testcontainers</groupId>
39+
<artifactId>kafka</artifactId>
40+
<version>${testcontainers-kafka.version}</version>
41+
<scope>test</scope>
42+
</dependency>
3343
</dependencies>
3444

3545
<properties>
36-
<spring-kafka.version>2.3.7.RELEASE</spring-kafka.version>
46+
<spring-kafka.version>2.5.8.RELEASE</spring-kafka.version>
47+
<testcontainers-kafka.version>1.15.0</testcontainers-kafka.version>
3748
</properties>
3849

3950
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.baeldung.kafka.embedded;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
5+
import org.apache.kafka.clients.consumer.ConsumerRecord;
6+
import org.springframework.kafka.annotation.KafkaListener;
7+
import org.springframework.stereotype.Component;
8+
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
@Component
13+
public class KafkaConsumer {
14+
15+
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
16+
17+
private CountDownLatch latch = new CountDownLatch(1);
18+
private String payload = null;
19+
20+
@KafkaListener(topics = "${test.topic}")
21+
public void receive(ConsumerRecord<?, ?> consumerRecord) {
22+
LOGGER.info("received payload='{}'", consumerRecord.toString());
23+
setPayload(consumerRecord.toString());
24+
latch.countDown();
25+
}
26+
27+
public CountDownLatch getLatch() {
28+
return latch;
29+
}
30+
31+
public String getPayload() {
32+
return payload;
33+
}
34+
35+
private void setPayload(String payload) {
36+
this.payload = payload;
37+
}
38+
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.baeldung.kafka.embedded;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.beans.factory.annotation.Autowired;
6+
import org.springframework.kafka.core.KafkaTemplate;
7+
import org.springframework.stereotype.Component;
8+
9+
@Component
10+
public class KafkaProducer {
11+
12+
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
13+
14+
@Autowired
15+
private KafkaTemplate<String, String> kafkaTemplate;
16+
17+
public void send(String topic, String payload) {
18+
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
19+
kafkaTemplate.send(topic, payload);
20+
}
21+
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.baeldung.kafka.embedded;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
5+
import org.springframework.boot.autoconfigure.SpringBootApplication;
6+
7+
@SpringBootApplication
8+
@EnableAutoConfiguration
9+
public class KafkaProducerConsumerApplication {
10+
11+
public static void main(String[] args) {
12+
SpringApplication.run(KafkaProducerConsumerApplication.class, args);
13+
}
14+
15+
}

spring-kafka/src/test/java/com/baeldung/SpringContextLiveTest.java

-17
This file was deleted.

spring-kafka/src/test/java/com/baeldung/SpringContextManualTest.java

-17
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.baeldung.kafka.embedded;
2+
3+
import static org.hamcrest.CoreMatchers.containsString;
4+
import static org.hamcrest.CoreMatchers.equalTo;
5+
import static org.hamcrest.MatcherAssert.assertThat;
6+
7+
import java.util.concurrent.TimeUnit;
8+
9+
import org.junit.jupiter.api.Test;
10+
import org.springframework.beans.factory.annotation.Autowired;
11+
import org.springframework.beans.factory.annotation.Value;
12+
import org.springframework.boot.test.context.SpringBootTest;
13+
import org.springframework.kafka.core.KafkaTemplate;
14+
import org.springframework.kafka.test.context.EmbeddedKafka;
15+
import org.springframework.test.annotation.DirtiesContext;
16+
17+
@SpringBootTest
18+
@DirtiesContext
19+
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
20+
class EmbeddedKafkaIntegrationTest {
21+
22+
@Autowired
23+
public KafkaTemplate<String, String> template;
24+
25+
@Autowired
26+
private KafkaConsumer consumer;
27+
28+
@Autowired
29+
private KafkaProducer producer;
30+
31+
@Value("${test.topic}")
32+
private String topic;
33+
34+
@Test
35+
public void givenEmbeddedKafkaBroker_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception {
36+
template.send(topic, "Sending with default template");
37+
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
38+
assertThat(consumer.getLatch().getCount(), equalTo(0L));
39+
40+
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
41+
}
42+
43+
@Test
44+
public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception {
45+
producer.send(topic, "Sending with our own simple KafkaProducer");
46+
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
47+
48+
assertThat(consumer.getLatch().getCount(), equalTo(0L));
49+
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
50+
}
51+
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package com.baeldung.kafka.testcontainers;
2+
3+
import static org.hamcrest.CoreMatchers.containsString;
4+
import static org.hamcrest.CoreMatchers.equalTo;
5+
import static org.hamcrest.MatcherAssert.assertThat;
6+
7+
import java.util.HashMap;
8+
import java.util.Map;
9+
import java.util.concurrent.TimeUnit;
10+
11+
import org.apache.kafka.clients.consumer.ConsumerConfig;
12+
import org.apache.kafka.clients.producer.ProducerConfig;
13+
import org.apache.kafka.common.serialization.StringDeserializer;
14+
import org.apache.kafka.common.serialization.StringSerializer;
15+
import org.junit.ClassRule;
16+
import org.junit.Test;
17+
import org.junit.runner.RunWith;
18+
import org.springframework.beans.factory.annotation.Autowired;
19+
import org.springframework.beans.factory.annotation.Value;
20+
import org.springframework.boot.test.context.SpringBootTest;
21+
import org.springframework.boot.test.context.TestConfiguration;
22+
import org.springframework.context.annotation.Bean;
23+
import org.springframework.context.annotation.Import;
24+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
25+
import org.springframework.kafka.core.ConsumerFactory;
26+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
27+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
28+
import org.springframework.kafka.core.KafkaTemplate;
29+
import org.springframework.kafka.core.ProducerFactory;
30+
import org.springframework.test.annotation.DirtiesContext;
31+
import org.springframework.test.context.junit4.SpringRunner;
32+
import org.testcontainers.containers.KafkaContainer;
33+
import org.testcontainers.utility.DockerImageName;
34+
35+
import com.baeldung.kafka.embedded.KafkaConsumer;
36+
import com.baeldung.kafka.embedded.KafkaProducer;
37+
import com.baeldung.kafka.embedded.KafkaProducerConsumerApplication;
38+
39+
@RunWith(SpringRunner.class)
40+
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersIntegrationTest.KafkaTestContainersConfiguration.class)
41+
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
42+
@DirtiesContext
43+
public class KafkaTestContainersIntegrationTest {
44+
45+
@ClassRule
46+
public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
47+
48+
@Autowired
49+
public KafkaTemplate<String, String> template;
50+
51+
@Autowired
52+
private KafkaConsumer consumer;
53+
54+
@Autowired
55+
private KafkaProducer producer;
56+
57+
@Value("${test.topic}")
58+
private String topic;
59+
60+
@Test
61+
public void givenKafkaDockerContainer_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception {
62+
template.send(topic, "Sending with default template");
63+
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
64+
65+
assertThat(consumer.getLatch().getCount(), equalTo(0L));
66+
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
67+
}
68+
69+
@Test
70+
public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception {
71+
producer.send(topic, "Sending with own controller");
72+
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
73+
74+
assertThat(consumer.getLatch().getCount(), equalTo(0L));
75+
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
76+
}
77+
78+
@TestConfiguration
79+
static class KafkaTestContainersConfiguration {
80+
81+
@Bean
82+
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
83+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
84+
factory.setConsumerFactory(consumerFactory());
85+
return factory;
86+
}
87+
88+
@Bean
89+
public ConsumerFactory<Integer, String> consumerFactory() {
90+
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
91+
}
92+
93+
@Bean
94+
public Map<String, Object> consumerConfigs() {
95+
Map<String, Object> props = new HashMap<>();
96+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
97+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
98+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
99+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
100+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
101+
return props;
102+
}
103+
104+
@Bean
105+
public ProducerFactory<String, String> producerFactory() {
106+
Map<String, Object> configProps = new HashMap<>();
107+
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
108+
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
109+
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
110+
return new DefaultKafkaProducerFactory<>(configProps);
111+
}
112+
113+
@Bean
114+
public KafkaTemplate<String, String> kafkaTemplate() {
115+
return new KafkaTemplate<>(producerFactory());
116+
}
117+
118+
}
119+
120+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
spring:
2+
kafka:
3+
consumer:
4+
auto-offset-reset: earliest
5+
group-id: baeldung
6+
test:
7+
topic: embedded-test-topic
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<configuration>
3+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
4+
<encoder>
5+
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
6+
</pattern>
7+
</encoder>
8+
</appender>
9+
10+
<root level="INFO">
11+
<appender-ref ref="STDOUT" />
12+
</root>
13+
</configuration>

0 commit comments

Comments
 (0)