Skip to content

Commit f00f4fb

Browse files
committed
docs: migration notes and improvements for kafka and ser/de
1 parent 3702bc2 commit f00f4fb

File tree

5 files changed

+479
-246
lines changed

5 files changed

+479
-246
lines changed

docs/Components/01-couchbase.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ This function configures the Couchbase Docker container that is going to be star
1717
Here you can define a `defaultBucket` name.
1818

1919
!!! warning
20-
Make sure that your application has the same bucket names.
20+
Make sure that your application has the same bucket names.
2121

2222
```kotlin
2323
TestSystem()

docs/Components/02-kafka.md

Lines changed: 150 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Kafka
22

3-
There are two ways to work with Kafka in Stove.
3+
There are two ways to work with Kafka in Stove. You can use standalone Kafka or Kafka with Spring. You can use only one
4+
of them in your project.
45

56
## Standalone Kafka
67

@@ -12,56 +13,116 @@ There are two ways to work with Kafka in Stove.
1213
}
1314
```
1415

15-
## Configure
16+
### Configure
1617

1718
```kotlin
1819
TestSystem()
19-
.with {
20-
// other dependencies
21-
22-
kafka {
23-
stoveKafkaObjectMapperRef = objectMapperRef
24-
KafkaSystemOptions {
25-
listOf(
26-
"kafka.bootstrapServers=${it.bootstrapServers}",
27-
"kafka.interceptorClasses=${it.interceptorClass}"
28-
)
29-
}
30-
}
31-
}.run()
20+
.with {
21+
// other dependencies
22+
23+
kafka {
24+
stoveKafkaObjectMapperRef = objectMapperRef
25+
KafkaSystemOptions {
26+
listOf(
27+
"kafka.bootstrapServers=${it.bootstrapServers}",
28+
"kafka.interceptorClasses=${it.interceptorClass}"
29+
)
30+
}
31+
}
32+
}.run()
33+
```
3234

35+
The configuration values are:
36+
37+
```kotlin
38+
class KafkaSystemOptions(
39+
/**
40+
* Suffixes for error and retry topics in the application.
41+
*/
42+
val topicSuffixes: TopicSuffixes = TopicSuffixes(),
43+
/**
44+
* If true, the system will listen to the messages published by the Kafka system.
45+
*/
46+
val listenPublishedMessagesFromStove: Boolean = false,
47+
/**
48+
* The port of the bridge gRPC server that is used to communicate with the Kafka system.
49+
*/
50+
val bridgeGrpcServerPort: Int = stoveKafkaBridgePortDefault.toInt(),
51+
/**
52+
* The Serde that is used while asserting the messages,
53+
* serializing while bridging the messages. Take a look at the [serde] property for more information.
54+
*
55+
* The default value is [StoveSerde.jackson]'s anyByteArraySerde.
56+
* Depending on your application's needs you might want to change this value.
57+
*
58+
* The places where it was used listed below:
59+
*
60+
* @see [com.trendyol.stove.testing.e2e.standalone.kafka.intercepting.StoveKafkaBridge] for bridging the messages.
61+
* @see StoveKafkaValueSerializer for serializing the messages.
62+
* @see StoveKafkaValueDeserializer for deserializing the messages.
63+
* @see valueSerializer for serializing the messages.
64+
*/
65+
val serde: StoveSerde<Any, ByteArray> = stoveSerdeRef,
66+
/**
67+
* The Value serializer that is used to serialize messages.
68+
*/
69+
val valueSerializer: Serializer<Any> = StoveKafkaValueSerializer(),
70+
/**
71+
* The options for the Kafka container.
72+
*/
73+
val containerOptions: KafkaContainerOptions = KafkaContainerOptions(),
74+
/**
75+
* The options for the Kafka system that is exposed to the application
76+
*/
77+
override val configureExposedConfiguration: (KafkaExposedConfiguration) -> List<String>
78+
) : SystemOptions, ConfiguresExposedConfiguration<KafkaExposedConfiguration>
3379
```
3480

35-
### Configuring Object Mapper
81+
### Configuring Serializer and Deserializer
3682

37-
Like every `SystemOptions` object, `KafkaSystemOptions` has a `stoveKafkaObjectMapperRef` field. You can set your own
38-
object mapper to this field. If you don't set it, Stove will use its default object mapper.
83+
Like every `SystemOptions` object, `KafkaSystemOptions` has a `serde` property that you can configure. It is a
84+
`StoveSerde` object that has two functions `serialize` and `deserialize`. You can configure them depending on your
85+
application's needs.
3986

4087
```kotlin
41-
var stoveKafkaObjectMapperRef: ObjectMapper = StoveObjectMapper.Default
88+
val kafkaSystemOptions = KafkaSystemOptions(
89+
serde = object : StoveSerde<Any, ByteArray> {
90+
override fun serialize(value: Any): ByteArray {
91+
return objectMapper.writeValueAsBytes(value)
92+
}
93+
94+
override fun <T> deserialize(value: ByteArray): T {
95+
return objectMapper.readValue(value, Any::class.java) as T
96+
}
97+
}
98+
)
4299
```
43100

44101
### Kafka Bridge With Your Application
45102

46103
Stove Kafka bridge is a **MUST** to work with Kafka. Otherwise you can't assert any messages from your application.
47104

48-
As you can see in the example above, you need to add a support to your application to work with interceptor that Stove provides.
105+
As you can see in the example above, you need to add a support to your application to work with interceptor that Stove
106+
provides.
49107

50108
```kotlin
51109
"kafka.interceptorClasses=com.trendyol.stove.testing.e2e.standalone.kafka.intercepting.StoveKafkaBridge"
110+
111+
// or
112+
113+
"kafka.interceptorClasses={cfg.interceptorClass}" // cfg.interceptorClass is exposed by Stove
52114
```
53115

54116
!!! Important
55-
`kafka.` prefix is an assumption that you can change it with your own prefix.
56117

57-
Make sure that `StoveKafkaBridge` is in your classpath.
118+
`kafka.` prefix or `interceptorClasses` are assumptions that you can change it with your own prefix or configuration.
58119

59-
## Usage
120+
## Spring Kafka
60121

61-
When you want to use Kafka with Application Aware testing it provides more assertion capabilities. It is recommended way of working.
62-
Stove-Kafka does that with intercepting the messages.
122+
When you want to use Kafka with Application Aware testing it provides more assertion capabilities. It is recommended way
123+
of working. Stove-Kafka does that with intercepting the messages.
63124

64-
## How to get?
125+
### How to get?
65126

66127
=== "Gradle"
67128

@@ -81,11 +142,12 @@ Stove-Kafka does that with intercepting the messages.
81142
</dependency>
82143
```
83144

84-
## Configure
145+
### Configure
85146

86-
### Configuration Values
147+
#### Configuration Values
87148

88-
Kafka works with some settings as default, your application might have these values as not configurable, to make the application testable we need to tweak a little bit.
149+
Kafka works with some settings as default, your application might have these values as not configurable, to make the
150+
application testable we need to tweak a little bit.
89151

90152
If you have the following configurations:
91153

@@ -99,70 +161,76 @@ As an example:
99161

100162
```kotlin
101163
TestSystem()
102-
.with{
103-
httpClient()
104-
kafka()
105-
springBoot(
106-
runner = { parameters ->
107-
com.trendyol.exampleapp.run(parameters)
108-
},
109-
withParameters = listOf(
110-
"logging.level.root=error",
111-
"logging.level.org.springframework.web=error",
112-
"spring.profiles.active=default",
113-
"server.http2.enabled=false",
114-
"kafka.heartbeatInSeconds=2",
115-
"kafka.autoCreateTopics=true",
116-
"kafka.offset=earliest"
117-
)
118-
)
119-
}.run()
164+
.with {
165+
httpClient()
166+
kafka()
167+
springBoot(
168+
runner = { parameters ->
169+
com.trendyol.exampleapp.run(parameters)
170+
},
171+
withParameters = listOf(
172+
"logging.level.root=error",
173+
"logging.level.org.springframework.web=error",
174+
"spring.profiles.active=default",
175+
"server.http2.enabled=false",
176+
"kafka.heartbeatInSeconds=2",
177+
"kafka.autoCreateTopics=true",
178+
"kafka.offset=earliest"
179+
)
180+
)
181+
}.run()
120182
```
121183

122-
As you can see, we pass these configuration values as parameters. Since they are configurable, the application considers these values instead of application-default values.
184+
As you can see, we pass these configuration values as parameters. Since they are configurable, the application considers
185+
these values instead of application-default values.
123186

124187
### Consumer Settings
125188

126-
Second thing we need to do is tweak your consumer configuration. For that we will provide Stove-Kafka interceptor to your Kafka configuration.
189+
Second thing we need to do is tweak your consumer configuration. For that we will provide Stove-Kafka interceptor to
190+
your Kafka configuration.
127191

128-
Locate to the point where you define your `ConcurrentKafkaListenerContainerFactory` or where you can set the interceptor. Interceptor needs to implement `ConsumerAwareRecordInterceptor<String, String>` since
192+
Locate to the point where you define your `ConcurrentKafkaListenerContainerFactory` or where you can set the
193+
interceptor. Interceptor needs to implement `ConsumerAwareRecordInterceptor<String, String>` since
129194
Stove-Kafka [relies on that](https://github.com/Trendyol/stove/blob/main/starters/spring/stove-spring-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/kafka/TestSystemInterceptor.kt).
130195

131196
```kotlin
132197
@EnableKafka
133198
@Configuration
134199
class KafkaConsumerConfiguration(
135-
private val interceptor: ConsumerAwareRecordInterceptor<String, String>,
200+
private val interceptor: ConsumerAwareRecordInterceptor<String, String>,
136201
) {
137202

138-
@Bean
139-
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
140-
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
141-
// ...
142-
factory.setRecordInterceptor(interceptor)
143-
return factory
144-
}
203+
@Bean
204+
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
205+
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
206+
// ...
207+
factory.setRecordInterceptor(interceptor)
208+
return factory
209+
}
145210
}
146211
```
147212

148213
### Producer Settings
149214

150-
Make sure that the [aforementioned](#configuration-values) values are also configureable for producer settings, too.
151-
Stove will have access to `KafkaTemplate` and will use `setProducerListener` to arrange itself to listen produced messages.
215+
Make sure that the [aforementioned](#configuration-values) values are also configurable for producer settings, too.
216+
Stove will have access to `KafkaTemplate` and will use `setProducerListener` to arrange itself to listen produced
217+
messages.
152218

153219
### Plugging in
154220

155-
When all the configuration is done, it is time to tell to application to use our `TestSystemInterceptor` and configuration values.
221+
When all the configuration is done, it is time to tell to application to use our `TestSystemInterceptor` and
222+
configuration values.
156223

157224
#### TestSystemInterceptor and TestInitializer
158225

159226
```kotlin
160227
class TestInitializer : BaseApplicationContextInitializer({
161-
bean<TestSystemInterceptor>(isPrimary = true)
228+
bean<TestSystemInterceptor>(isPrimary = true)
229+
bean { StoveSerde.jackson.anyByteArraySerde(yourObjectMapper()) } // or any serde that implements StoveSerde<Any, ByteArray>
162230
})
163231

164232
fun SpringApplication.addTestDependencies() {
165-
this.addInitializers(TestInitializer())
233+
this.addInitializers(TestInitializer())
166234
}
167235
```
168236

@@ -172,30 +240,30 @@ fun SpringApplication.addTestDependencies() {
172240

173241
```kotlin hl_lines="4"
174242
springBoot(
175-
runner = { parameters ->
176-
com.trendyol.exampleapp.run(parameters) {
177-
addTestDependencies() // Enable TestInitializer with extensions call
178-
}
179-
},
180-
withParameters = listOf(
181-
"logging.level.root=error",
182-
"logging.level.org.springframework.web=error",
183-
"spring.profiles.active=default",
184-
"server.http2.enabled=false",
185-
"kafka.heartbeatInSeconds=2", // Added Parameter
186-
"kafka.autoCreateTopics=true", // Added Parameter
187-
"kafka.offset=earliest" // Added Parameter
188-
)
243+
runner = { parameters ->
244+
com.trendyol.exampleapp.run(parameters) {
245+
addTestDependencies() // Enable TestInitializer with extensions call
246+
}
247+
},
248+
withParameters = listOf(
249+
"logging.level.root=error",
250+
"logging.level.org.springframework.web=error",
251+
"spring.profiles.active=default",
252+
"server.http2.enabled=false",
253+
"kafka.heartbeatInSeconds=2", // Added Parameter
254+
"kafka.autoCreateTopics=true", // Added Parameter
255+
"kafka.offset=earliest" // Added Parameter
256+
)
189257
)
190258
```
191259

192260
Now you're full set and have control over Kafka messages from the testing context.
193261

194262
```kotlin
195263
TestSystem.validate {
196-
kafka {
197-
shouldBeConsumed<AnyEvent> { actual-> }
198-
shouldBePublished<AnyEvent> { actual-> }
199-
}
200-
}
264+
kafka {
265+
shouldBeConsumed<AnyEvent> { actual -> }
266+
shouldBePublished<AnyEvent> { actual -> }
267+
}
268+
}
201269
```

0 commit comments

Comments
 (0)