diff --git a/build.gradle.kts b/build.gradle.kts index 54de2fa..1205030 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,13 +1,7 @@ import com.bnorm.power.PowerAssertGradleExtension import kotlinx.knit.KnitPluginExtension -import org.gradle.api.tasks.testing.logging.TestExceptionFormat -import org.gradle.api.tasks.testing.logging.TestExceptionFormat.* -import org.gradle.api.tasks.testing.logging.TestLogEvent -import org.gradle.api.tasks.testing.logging.TestLogEvent.FAILED -import org.gradle.api.tasks.testing.logging.TestLogEvent.PASSED -import org.gradle.api.tasks.testing.logging.TestLogEvent.SKIPPED -import org.gradle.api.tasks.testing.logging.TestLogEvent.STANDARD_ERROR -import org.gradle.api.tasks.testing.logging.TestLogEvent.STANDARD_OUT +import org.gradle.api.tasks.testing.logging.TestExceptionFormat.FULL +import org.gradle.api.tasks.testing.logging.TestLogEvent.* import org.jetbrains.dokka.gradle.DokkaTask plugins { @@ -17,10 +11,12 @@ plugins { alias(libs.plugins.knit) alias(libs.plugins.publish) alias(libs.plugins.power.assert) + idea } repositories { mavenCentral() + maven("https://oss.sonatype.org/content/repositories/snapshots") } group = "io.github.nomisrev" @@ -34,8 +30,15 @@ dependencies { testImplementation(kotlin("test")) testImplementation(libs.testcontainers.kafka) - testImplementation(libs.slf4j.simple) testImplementation(libs.kotlinx.coroutines.test) + testImplementation(libs.jackson.kotlin) + testImplementation(libs.jackson.databind) + testImplementation(libs.kotest.framework.api) + testImplementation(libs.kotest.runner.junit5) + testImplementation(libs.kotest.property) + testImplementation(libs.stove.testing) + testImplementation(libs.stove.testing.kafka) + testImplementation(libs.logback.classic) } configure { @@ -52,10 +55,34 @@ configure { } } +sourceSets { + @Suppress("LocalVariableName", "ktlint:standard:property-naming") + val `test-e2e` by creating { + compileClasspath += sourceSets.main.get().output + runtimeClasspath += sourceSets.main.get().output + } + + val testE2eImplementation by configurations.getting { + extendsFrom(configurations.testImplementation.get()) + } + configurations["testE2eRuntimeOnly"].extendsFrom(configurations.runtimeOnly.get()) +} + +idea { + module { + testSources.from(sourceSets["test-e2e"].allSource.sourceDirectories) + testResources.from(sourceSets["test-e2e"].resources.sourceDirectories) + isDownloadJavadoc = true + isDownloadSources = true + } +} + kotlin { explicitApi() + jvmToolchain(17) } + tasks { withType().configureEach { outputDirectory.set(rootDir.resolve("docs")) @@ -88,5 +115,20 @@ tasks { exceptionFormat = FULL events = setOf(SKIPPED, FAILED, STANDARD_ERROR) } + jvmArgs("--add-opens", "java.base/java.util=ALL-UNNAMED") + } + + task("e2eTest") { + description = "Runs e2e tests." + group = "verification" + testClassesDirs = sourceSets["test-e2e"].output.classesDirs + classpath = sourceSets["test-e2e"].runtimeClasspath + + useJUnitPlatform() + reports { + junitXml.required.set(true) + html.required.set(true) + } + jvmArgs("--add-opens", "java.base/java.util=ALL-UNNAMED") } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 87a4245..9d40b29 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -11,6 +11,8 @@ slf4j = "2.0.12" spotless="6.25.0" publish="0.28.0" power-assert="0.13.0" +stove = "1.0.0-SNAPSHOT" +jackson = "2.17.1" [libraries] kotest-property = { module = "io.kotest:kotest-property", version.ref = "kotest" } @@ -26,6 +28,18 @@ testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "tes slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" } +jackson-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" } +jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" } +logback-classic = { module = "ch.qos.logback:logback-classic", version = "1.5.6" } + +# Testing +stove-testing = { module = "com.trendyol:stove-testing-e2e", version.ref = "stove" } +stove-ktor-testing = { module = "com.trendyol:stove-ktor-testing-e2e", version.ref = "stove" } +stove-testing-kafka = { module = "com.trendyol:stove-testing-e2e-kafka", version.ref = "stove" } +kotest-runner-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotest" } +kotest-framework-api = { module = "io.kotest:kotest-framework-api", version.ref = "kotest" } + + [plugins] kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" } dokka = { id = "org.jetbrains.dokka", version.ref = "dokka" } diff --git a/settings.gradle.kts b/settings.gradle.kts index 39bfed5..f08237e 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -5,7 +5,9 @@ rootProject.name = "kotlin-kafka" dependencyResolutionManagement { repositories { mavenCentral() + maven("https://oss.sonatype.org/content/repositories/snapshots") } } + include(":guide") diff --git a/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/KafkaApplicationUnderTest.kt b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/KafkaApplicationUnderTest.kt new file mode 100644 index 0000000..0be795d --- /dev/null +++ b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/KafkaApplicationUnderTest.kt @@ -0,0 +1,97 @@ +package io.github.nomisRev.kafka.e2e.setup + +import com.trendyol.stove.testing.e2e.system.abstractions.ApplicationUnderTest +import io.github.nomisRev.kafka.e2e.setup.example.KafkaTestShared +import io.github.nomisRev.kafka.e2e.setup.example.ReceiveMethod +import io.github.nomisRev.kafka.e2e.setup.example.StoveKafkaValueDeserializer +import io.github.nomisRev.kafka.e2e.setup.example.StoveKafkaValueSerializer +import io.github.nomisRev.kafka.publisher.KafkaPublisher +import io.github.nomisRev.kafka.publisher.PublisherSettings +import io.github.nomisRev.kafka.receiver.CommitStrategy +import io.github.nomisRev.kafka.receiver.KafkaReceiver +import io.github.nomisRev.kafka.receiver.ReceiverSettings +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import java.util.* +import kotlin.time.Duration.Companion.seconds + +/** + * Stove's Kafka application under test implementation + */ +class KafkaApplicationUnderTest : ApplicationUnderTest { + private lateinit var client: AdminClient + private val consumers: MutableList = mutableListOf() + + override suspend fun start(configurations: List) { + val bootstrapServers = configurations.first { it.contains("kafka.servers", true) }.split('=')[1] + val interceptorClass = configurations.first { it.contains("kafka.interceptor-classes", true) }.split('=')[1] + val receiveMethod = configurations.first { it.contains("kafka.receive-method", true) }.split('=')[1] + client = createAdminClient(bootstrapServers) + createTopics(client) + startConsumers(bootstrapServers, interceptorClass, ReceiveMethod.from(receiveMethod)) + } + + private fun createAdminClient(bootstrapServers: String): AdminClient { + return mapOf( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers + ).let { AdminClient.create(it) } + } + + private fun createTopics(client: AdminClient) { + val newTopics = KafkaTestShared.topics.flatMap { + listOf(it.topic, it.retryTopic, it.deadLetterTopic) + }.map { NewTopic(it, 1, 1) } + client.createTopics(newTopics).all().get() + } + + private fun startConsumers(bootStrapServers: String, interceptorClass: String, receiveMethod: ReceiveMethod) { + val (publisher, receiver) = createPublisherAndReceiver(interceptorClass, bootStrapServers) + val configuredConsumers = KafkaTestShared.consumers(receiver, publisher, receiveMethod) + configuredConsumers.forEach { it.start() } + consumers.addAll(configuredConsumers) + } + + private fun createPublisherAndReceiver( + interceptorClass: String, bootStrapServers: String + ): Pair, KafkaReceiver> { + val consumerSettings = mapOf( + ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to "2000", + ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG to "true", // Expected to be created by the client + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG to listOf(interceptorClass) + ) + + val receiverSettings = ReceiverSettings(bootstrapServers = bootStrapServers, + valueDeserializer = StoveKafkaValueDeserializer(), + keyDeserializer = StringDeserializer(), + groupId = "stove-application-consumers", + commitStrategy = CommitStrategy.ByTime(2.seconds), + pollTimeout = 1.seconds, + properties = Properties().apply { + putAll(consumerSettings) + }) + + val producerSettings = PublisherSettings(bootStrapServers, + StringSerializer(), + StoveKafkaValueSerializer(), + properties = Properties().apply { + put( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, listOf(interceptorClass) + ) + }) + + val publisher = KafkaPublisher(producerSettings) + val receiver = KafkaReceiver(receiverSettings) + return Pair(publisher, receiver) + } + + override suspend fun stop() { + client.close() + consumers.forEach { it.close() } + } +} diff --git a/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/ProjectConfig.kt b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/ProjectConfig.kt new file mode 100644 index 0000000..e4114aa --- /dev/null +++ b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/ProjectConfig.kt @@ -0,0 +1,26 @@ +package io.github.nomisRev.kafka.e2e.setup + +import com.trendyol.stove.testing.e2e.standalone.kafka.KafkaSystemOptions +import com.trendyol.stove.testing.e2e.standalone.kafka.kafka +import com.trendyol.stove.testing.e2e.system.TestSystem +import io.kotest.core.config.AbstractProjectConfig + +class ProjectConfig : AbstractProjectConfig() { + override suspend fun beforeProject(): Unit = TestSystem() + .with { + kafka { + KafkaSystemOptions( + configureExposedConfiguration = { cfg -> + listOf( + "kafka.servers=${cfg.bootstrapServers}", + "kafka.interceptor-classes=${cfg.interceptorClass}", + "kafka.receive-method=kotlin-kafka" // here we can change to: 'kotlin-kafka' or 'traditional' + ) + } + ) + } + applicationUnderTest(KafkaApplicationUnderTest()) + }.run() + + override suspend fun afterProject(): Unit = TestSystem.stop() +} diff --git a/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/ConsumerSupervisor.kt b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/ConsumerSupervisor.kt new file mode 100644 index 0000000..ce9dab4 --- /dev/null +++ b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/ConsumerSupervisor.kt @@ -0,0 +1,113 @@ +package io.github.nomisRev.kafka.e2e.setup.example + +import io.github.nomisRev.kafka.e2e.setup.example.KafkaTestShared.TopicDefinition +import io.github.nomisRev.kafka.publisher.KafkaPublisher +import io.github.nomisRev.kafka.receiver.KafkaReceiver +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.flattenConcat +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerRecord +import java.time.Duration + +/** + * Supervisor that uses KafkaReceiver to retrieve messages from Kafka and handle them accordingly + */ +abstract class ConsumerSupervisor( + private val receiver: KafkaReceiver, + private val publisher: KafkaPublisher, + private val receiveMethod: ReceiveMethod +) : AutoCloseable { + private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + private val logger = org.slf4j.LoggerFactory.getLogger(javaClass) + + abstract val topicDefinition: TopicDefinition + + /** + * Here we start the consumer + * We can use either KotlinKafka receiver or traditional while(true) loop to receive messages + * Traditional while(true) loop is successful in receiving messages continuously + * KotlinKafka receiver, continuously receives messages + */ + fun start() = when (receiveMethod) { + ReceiveMethod.KOTLIN_KAFKA_RECEIVE -> kotlinKafkaReceive() + ReceiveMethod.TRADITIONAL_RECEIVE -> traditionalReceive() + } + + @OptIn(ExperimentalCoroutinesApi::class) + private fun kotlinKafkaReceive() { + scope.launch { + receiver.receiveAutoAck( + listOf( + topicDefinition.topic, + topicDefinition.retryTopic, + topicDefinition.deadLetterTopic + ) + ).flattenConcat() + .collect { message -> + logger.info("Message RECEIVED on the application side with KotlinKafka receiver: ${message.value()}") + received(message) // expected to receive the messages continuously? + } + } + } + + private fun traditionalReceive() { + scope.launch { + receiver.withConsumer { consumer -> + consumer.subscribe( + listOf( + topicDefinition.topic, + topicDefinition.retryTopic, + topicDefinition.deadLetterTopic + ) + ) + while (isActive) { + val records = consumer.poll(Duration.ofMillis(500)) + records.forEach { record -> + logger.info("Message RECEIVED on the application side with traditional while(true) loop: ${record.value()}") + received(record) { + consumer.commitAsync() + } + } + } + } + } + } + + + abstract suspend fun consume(record: ConsumerRecord) + + protected open suspend fun handleError(message: ConsumerRecord, e: Exception) { + logger.error("Failed to process message: $message", e) + } + + private suspend fun received(message: ConsumerRecord, onSuccess: (ConsumerRecord) -> Unit = { }) { + try { + consume(message) + onSuccess(message) + logger.info("Message COMMITTED on the application side: ${message.value()}") + } catch (e: Exception) { + handleError(message, e) + logger.warn("CONSUMER GOT an ERROR on the application side, exception: $e") + val record = ProducerRecord( + topicDefinition.deadLetterTopic, + message.partition(), + message.key(), + message.value(), + message.headers() + ) + try { + publisher.publishScope { offer(record) } + } catch (e: Exception) { + logger.error("Failed to publish message to dead letter topic: $message", e) + } + } + } + + override fun close(): Unit = runBlocking { + try { + scope.cancel() + } catch (e: Exception) { + logger.error("Failed to stop consuming", e) + } + } +} diff --git a/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/DomainEvents.kt b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/DomainEvents.kt new file mode 100644 index 0000000..628b2dd --- /dev/null +++ b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/DomainEvents.kt @@ -0,0 +1,5 @@ +package io.github.nomisRev.kafka.e2e.setup.example + +object DomainEvents { + data class ProductCreated(val productId: String) +} diff --git a/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/KafkaTestShared.kt b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/KafkaTestShared.kt new file mode 100644 index 0000000..ce92d0a --- /dev/null +++ b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/KafkaTestShared.kt @@ -0,0 +1,21 @@ +package io.github.nomisRev.kafka.e2e.setup.example + +import io.github.nomisRev.kafka.publisher.KafkaPublisher +import io.github.nomisRev.kafka.receiver.KafkaReceiver + +object KafkaTestShared { + data class TopicDefinition( + val topic: String, + val retryTopic: String, + val deadLetterTopic: String + ) + + val topics = listOf( + TopicDefinition("product", "product.retry", "product.error"), + ) + val consumers: ( + kafkaReceiver: KafkaReceiver, + producerSettings: KafkaPublisher, + receiveMethod: ReceiveMethod + ) -> List> = { a, b, r -> listOf(ProductConsumer(a, b, r)) } +} diff --git a/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/ProductConsumer.kt b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/ProductConsumer.kt new file mode 100644 index 0000000..e43012e --- /dev/null +++ b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/ProductConsumer.kt @@ -0,0 +1,21 @@ +package io.github.nomisRev.kafka.e2e.setup.example + +import io.github.nomisRev.kafka.e2e.setup.example.KafkaTestShared.TopicDefinition +import io.github.nomisRev.kafka.publisher.KafkaPublisher +import io.github.nomisRev.kafka.receiver.KafkaReceiver +import org.apache.kafka.clients.consumer.ConsumerRecord + +/** + * Just a regular consumer, nothing special, listening to the product, product.retry and product.error topics + */ +class ProductConsumer( + kafkaReceiver: KafkaReceiver, + kafkaPublisher: KafkaPublisher, + receiveMethod: ReceiveMethod +) : ConsumerSupervisor(kafkaReceiver, kafkaPublisher, receiveMethod) { + private val logger = org.slf4j.LoggerFactory.getLogger(javaClass) + override val topicDefinition: TopicDefinition = TopicDefinition("product", "product.retry", "product.error") + override suspend fun consume(record: ConsumerRecord) { + logger.info("ProductConsumer received: ${record.value()}") + } +} diff --git a/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/ReceiveMethod.kt b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/ReceiveMethod.kt new file mode 100644 index 0000000..96b0404 --- /dev/null +++ b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/ReceiveMethod.kt @@ -0,0 +1,26 @@ +package io.github.nomisRev.kafka.e2e.setup.example + +/** + * Receive method to change the way of receiving messages from Kafka + */ +enum class ReceiveMethod { + /** + * Using Kotlin Kafka receiver + */ + KOTLIN_KAFKA_RECEIVE, + + /** + * Using traditional while(true) loop to receive messages + */ + TRADITIONAL_RECEIVE; + + companion object { + fun from(value: String): ReceiveMethod = value.toKafkaReceiverMethod() + private fun String.toKafkaReceiverMethod(): ReceiveMethod = when (this) { + "kotlin-kafka" -> KOTLIN_KAFKA_RECEIVE + "traditional" -> TRADITIONAL_RECEIVE + else -> error("Unknown receive method: $this") + } + } + +} diff --git a/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/SerDe.kt b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/SerDe.kt new file mode 100644 index 0000000..7d1ccd8 --- /dev/null +++ b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/setup/example/SerDe.kt @@ -0,0 +1,24 @@ +package io.github.nomisRev.kafka.e2e.setup.example + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import org.apache.kafka.common.serialization.Deserializer +import org.apache.kafka.common.serialization.Serializer + + +private val stoveKafkaObjectMapperRef = ObjectMapper() + +@Suppress("UNCHECKED_CAST") +class StoveKafkaValueDeserializer : Deserializer { + override fun deserialize( + topic: String, + data: ByteArray + ): T = stoveKafkaObjectMapperRef.readValue(data) as T +} + +class StoveKafkaValueSerializer : Serializer { + override fun serialize( + topic: String, + data: T + ): ByteArray = stoveKafkaObjectMapperRef.writeValueAsBytes(data) +} diff --git a/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/tests/KafkaSystemTests.kt b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/tests/KafkaSystemTests.kt new file mode 100644 index 0000000..9a6ca73 --- /dev/null +++ b/src/test-e2e/kotlin/io/github/nomisRev/kafka/e2e/tests/KafkaSystemTests.kt @@ -0,0 +1,25 @@ +package io.github.nomisRev.kafka.e2e.tests + +import arrow.core.some +import com.trendyol.stove.testing.e2e.standalone.kafka.kafka +import com.trendyol.stove.testing.e2e.system.TestSystem.Companion.validate +import io.github.nomisRev.kafka.e2e.setup.example.DomainEvents.ProductCreated +import io.kotest.core.spec.style.FunSpec +import kotlin.random.Random +import kotlin.time.Duration.Companion.seconds + +class KafkaSystemTests : FunSpec({ + val randomString = { Random.nextInt(0, Int.MAX_VALUE).toString() } + + test("message should be committed and consumed successfully") { + validate { + kafka { + val productId = randomString() + "[productCreated]" + publish("product", message = ProductCreated(productId), key = randomString().some()) + shouldBeConsumed(10.seconds) { + actual.productId == productId + } + } + } + } +}) diff --git a/src/test-e2e/resources/logback-test.xml b/src/test-e2e/resources/logback-test.xml new file mode 100644 index 0000000..a1e9ff6 --- /dev/null +++ b/src/test-e2e/resources/logback-test.xml @@ -0,0 +1,20 @@ + + + + + + %white([%t]) %highlight(%-5level) %magenta(%c{1}) %cyan(trace.id:%X{traceId} version:%X{version}) - + %yellow(%m) %n + + + + + + + + + + + + +