Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ apiValidation {
"ktor-example",
"micronaut-example",
"spring-example",
"spring-4x-example",
"spring-standalone-example",
"spring-streams-example"
)
Expand Down
29 changes: 29 additions & 0 deletions examples/spring-4x-example/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
plugins {
alias(libs.plugins.spring.plugin)
alias(libs.plugins.spring.boot.four)
idea
application
}

dependencies {
implementation(libs.spring.boot.four)
implementation(libs.spring.boot.four.autoconfigure)
implementation(libs.spring.boot.four.webflux)
implementation(libs.spring.boot.four.actuator)
annotationProcessor(libs.spring.boot.four.annotationProcessor)
implementation(libs.spring.boot.four.kafka)
implementation(libs.kotlinx.reactor)
implementation(libs.kotlinx.core)
implementation(libs.kotlinx.reactive)
implementation(libs.kotlinx.slf4j)
}

dependencies {
testImplementation(libs.jackson3.kotlin)
testImplementation(projects.stove.lib.stoveTestingE2eHttp)
testImplementation(projects.stove.lib.stoveTestingE2eWiremock)
testImplementation(projects.stove.starters.spring.stoveSpring4xTestingE2e)
testImplementation(projects.stove.starters.spring.stoveSpring4xTestingE2eKafka)
}

application { mainClass.set("stove.spring.example4x.ExampleAppkt") }
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package stove.spring.example4x

import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.ConfigurableApplicationContext

@SpringBootApplication
class ExampleApp

fun main(args: Array<String>) {
run(args)
}

/**
* This is the point where spring application gets run.
* run(args, init) method is the important point for the testing configuration.
* init allows us to override any dependency from the testing side that is being time related or configuration related.
* Spring itself opens this configuration higher order function to the outside.
*/
fun run(
args: Array<String>,
init: SpringApplication.() -> Unit = {}
): ConfigurableApplicationContext = runApplication<ExampleApp>(*args, init = init)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package stove.spring.example4x.application.handlers

import org.springframework.stereotype.Service
import stove.spring.example4x.infrastructure.api.ProductCreateRequest

@Service
class ProductCreator {
suspend fun create(request: ProductCreateRequest) {
// In a real application, this would persist the product
println("Creating product: ${request.name} with id ${request.id}")
}
}

data class ProductCreatedEvent(
val id: Long,
val name: String,
val supplierId: Long
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package stove.spring.example4x.infrastructure.api

import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*
import stove.spring.example4x.application.handlers.*
import stove.spring.example4x.infrastructure.messaging.kafka.KafkaProducer

@RestController
@RequestMapping("/api")
class ProductController(
private val productCreator: ProductCreator,
private val kafkaProducer: KafkaProducer
) {
@GetMapping("/index")
suspend fun index(
@RequestParam(required = false) keyword: String?
): ResponseEntity<String> = ResponseEntity.ok("Hi from Stove framework with ${keyword ?: "no keyword"}")

@PostMapping("/product/create")
suspend fun create(
@RequestBody request: ProductCreateRequest
): ResponseEntity<Any> {
productCreator.create(request)
kafkaProducer.send(ProductCreatedEvent(request.id, request.name, request.supplierId))
return ResponseEntity.ok().build()
}
}

data class ProductCreateRequest(
val id: Long,
val name: String,
val supplierId: Long
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
@file:Suppress("DEPRECATION")

package stove.spring.example4x.infrastructure.messaging.kafka

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.*
import org.springframework.boot.context.properties.*
import org.springframework.context.annotation.*
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.*
import org.springframework.kafka.listener.RecordInterceptor
import org.springframework.kafka.support.serializer.*

@Configuration
@EnableKafka
@EnableConfigurationProperties(KafkaProperties::class)
class KafkaConfiguration {
@Bean
fun kafkaListenerContainerFactory(
consumerFactory: ConsumerFactory<String, String>,
interceptor: RecordInterceptor<String, String>?
): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.setConsumerFactory(consumerFactory)
interceptor?.let { factory.setRecordInterceptor(it) }
return factory
}

@Bean
@Suppress("MagicNumber")
fun consumerFactory(
config: KafkaProperties
): ConsumerFactory<String, String> = DefaultKafkaConsumerFactory(
mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to config.bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG to config.groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to config.offset,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ErrorHandlingDeserializer::class.java,
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS to StringDeserializer::class.java,
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to config.heartbeatInSeconds * 1000,
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to config.heartbeatInSeconds * 3000,
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG to config.heartbeatInSeconds * 3000
)
)

@Bean
fun kafkaTemplate(
config: KafkaProperties
): KafkaTemplate<String, Any> = KafkaTemplate(
DefaultKafkaProducerFactory(
mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to config.bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JacksonJsonSerializer::class.java,
ProducerConfig.ACKS_CONFIG to config.acks
)
)
)
}

@ConfigurationProperties(prefix = "kafka")
data class KafkaProperties(
val bootstrapServers: String,
val groupId: String = "spring-4x-example",
val offset: String = "earliest",
val acks: String = "1",
val heartbeatInSeconds: Int = 3,
val topicPrefix: String = "trendyol.stove.service"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package stove.spring.example4x.infrastructure.messaging.kafka

import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
import stove.spring.example4x.application.handlers.ProductCreatedEvent

@Component
class KafkaProducer(
private val kafkaTemplate: KafkaTemplate<String, Any>,
private val kafkaProperties: KafkaProperties
) {
suspend fun send(event: ProductCreatedEvent) {
val topic = "${kafkaProperties.topicPrefix}.productCreated.1"
kafkaTemplate.send(topic, event.id.toString(), event)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package stove.spring.example4x.infrastructure.messaging.kafka

import org.slf4j.*
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.messaging.handler.annotation.*
import org.springframework.stereotype.Component
import stove.spring.example4x.application.handlers.ProductCreator
import stove.spring.example4x.infrastructure.api.ProductCreateRequest
import tools.jackson.databind.json.JsonMapper

@Component
class ProductCreateConsumer(
private val productCreator: ProductCreator,
private val jsonMapper: JsonMapper
) {
private val logger: Logger = LoggerFactory.getLogger(javaClass)

@KafkaListener(topics = ["trendyol.stove.service.product.create.0"], groupId = "\${kafka.groupId}")
suspend fun consume(
@Payload message: String,
@Header("X-UserEmail", required = false) userEmail: String?
) {
logger.info("Received message: $message with userEmail: $userEmail")
val command = jsonMapper.readValue(message, CreateProductCommand::class.java)
productCreator.create(ProductCreateRequest(command.id, command.name, command.supplierId))
}
}

data class CreateProductCommand(
val id: Long,
val name: String,
val supplierId: Long
)
14 changes: 14 additions & 0 deletions examples/spring-4x-example/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
spring:
application:
name: "stove-spring-4x-example"

server:
port: 8001

kafka:
bootstrapServers: localhost:9092
topicPrefix: trendyol.stove.service
acks: "1"
offset: "latest"
heartbeatInSeconds: 30
groupId: spring-4x-example
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.stove.spring.example4x.e2e

import arrow.core.some
import com.trendyol.stove.testing.e2e.http.*
import com.trendyol.stove.testing.e2e.kafka.kafka
import com.trendyol.stove.testing.e2e.system.*
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import io.kotest.matchers.string.shouldContain
import stove.spring.example4x.application.handlers.ProductCreatedEvent
import stove.spring.example4x.infrastructure.api.ProductCreateRequest
import stove.spring.example4x.infrastructure.messaging.kafka.CreateProductCommand
import kotlin.time.Duration.Companion.seconds

class ExampleTest :
FunSpec({
test("index should be reachable") {
TestSystem.validate {
http {
get<String>("/api/index", queryParams = mapOf("keyword" to testCase.name.name)) { actual ->
actual shouldContain "Hi from Stove framework with ${testCase.name.name}"
println(actual)
}
get<String>("/api/index") { actual ->
actual shouldContain "Hi from Stove framework with"
println(actual)
}
}
}
}

test("should create new product when send product create request from api") {
TestSystem.validate {
val productCreateRequest = ProductCreateRequest(1L, name = "product name", 99L)

http {
postAndExpectBodilessResponse(uri = "/api/product/create", body = productCreateRequest.some()) { actual ->
actual.status shouldBe 200
}
}

kafka {
shouldBePublished<ProductCreatedEvent> {
actual.id == productCreateRequest.id &&
actual.name == productCreateRequest.name &&
actual.supplierId == productCreateRequest.supplierId
}
}
}
}

test("should consume product create command from kafka") {
TestSystem.validate {
val createProductCommand = CreateProductCommand(2L, name = "product from kafka", 100L)

kafka {
publish("trendyol.stove.service.product.create.0", createProductCommand)
shouldBeConsumed<CreateProductCommand>(10.seconds) {
actual.id == createProductCommand.id &&
actual.name == createProductCommand.name &&
actual.supplierId == createProductCommand.supplierId
}
}
}
}
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.stove.spring.example4x.e2e

import com.trendyol.stove.testing.e2e.*
import com.trendyol.stove.testing.e2e.http.*
import com.trendyol.stove.testing.e2e.kafka.*
import com.trendyol.stove.testing.e2e.system.TestSystem
import com.trendyol.stove.testing.e2e.wiremock.*
import io.kotest.core.config.AbstractProjectConfig
import org.slf4j.*
import tools.jackson.databind.json.JsonMapper

class Stove : AbstractProjectConfig() {
private val logger: Logger = LoggerFactory.getLogger("WireMockMonitor")

override suspend fun beforeProject(): Unit =
TestSystem()
.with {
httpClient {
HttpClientSystemOptions(
baseUrl = "http://localhost:8005"
)
}
kafka {
KafkaSystemOptions(
containerOptions = KafkaContainerOptions(tag = "7.8.1")
) {
listOf(
"kafka.bootstrapServers=${it.bootstrapServers}",
"kafka.groupId=spring-4x-example"
)
}
}
bridge()
wiremock {
WireMockSystemOptions(
port = 7079,
removeStubAfterRequestMatched = true,
afterRequest = { e, _ ->
logger.info(e.request.toString())
}
)
}
springBoot(
runner = { parameters ->
stove.spring.example4x.run(parameters) {
addInitializers(
stoveSpringRegistrar {
registerBean<TestSystemKafkaInterceptor<*, *>>(primary = true)
registerBean {
val jsonMapper = this.bean<JsonMapper>()
StoveJackson3ThroughIfStringSerde(jsonMapper)
}
}
)
}
},
withParameters = listOf(
"server.port=8005",
"logging.level.root=info",
"logging.level.org.springframework.web=info",
"spring.profiles.active=default",
"kafka.heartbeatInSeconds=2",
"kafka.offset=earliest"
)
)
}.run()

override suspend fun afterProject(): Unit = TestSystem.stop()
}
Loading
Loading