Skip to content

Commit

Permalink
Merge branch 'main' into beta5
Browse files Browse the repository at this point in the history
  • Loading branch information
nomisRev authored Apr 9, 2024
2 parents 911103d + 40f00d3 commit d42e9bf
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
distribution: 'zulu'
java-version: 17

- uses: gradle/gradle-build-action@v2
- uses: gradle/gradle-build-action@v3
with:
arguments: build --scan --full-stacktrace

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/githubpages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
with:
fetch-depth: 0

- uses: gradle/gradle-build-action@v2
- uses: gradle/gradle-build-action@v3
with:
arguments: -Pversion=${{ github.event.release.tag_name }} dokkaHtml

Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
jobs:
check:
runs-on: ubuntu-latest
timeout-minutes: 30
timeout-minutes: 120

steps:
- uses: actions/checkout@v4
Expand All @@ -21,9 +21,9 @@ jobs:
distribution: 'zulu'
java-version: 17

- uses: gradle/gradle-build-action@v2
- uses: gradle/gradle-build-action@v3
with:
arguments: build --scan --full-stacktrace
arguments: build --scan --full-stacktrace -PstressTest=100

- name: Bundle the build report
if: failure()
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
distribution: 'zulu'
java-version: 11

- uses: gradle/gradle-build-action@v2
- uses: gradle/gradle-build-action@v3
with:
arguments: assemble -Pversion=${{ inputs.version }}

Expand All @@ -42,6 +42,6 @@ jobs:
path: '**/build/reports/**'

- name: Publish final version
uses: gradle/gradle-build-action@v2
uses: gradle/gradle-build-action@v3
with:
arguments: -Pversion=${{ inputs.version }} publishAllPublicationsToMavenCentralRepository
4 changes: 4 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ tasks {

withType<Test>().configureEach {
useJUnitPlatform()
maxParallelForks = (2 * Runtime.getRuntime().availableProcessors())
if (project.hasProperty("stressTest")) {
systemProperty("io.github.nomisrev.kafka.TEST_ITERATIONS", project.properties["stressTest"] ?: 100)
}
testLogging {
exceptionFormat = FULL
events = setOf(SKIPPED, FAILED, STANDARD_ERROR)
Expand Down
10 changes: 5 additions & 5 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
[versions]
kotest = "5.8.0"
kafka = "3.6.1"
kotest = "5.8.1"
kafka = "3.7.0"
kotlin = "2.0.0-Beta5"
kotlinx-coroutines = "1.8.0"
dokka = "1.9.10"
dokka = "1.9.20"
knit = "0.5.0"
kover = "0.7.6"
testcontainers-kafka = "1.19.6"
testcontainers-kafka = "1.19.7"
slf4j = "2.0.12"
spotless="6.25.0"
publish="0.27.0"
publish="0.28.0"

[libraries]
kotest-property = { module = "io.kotest:kotest-property", version.ref = "kotest" }
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
10 changes: 7 additions & 3 deletions src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.RangeAssignor
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -195,7 +194,7 @@ public fun <K, V> List<ConsumerRecords<K, V>>.offsets(
public fun <K, V> Flow<KafkaConsumer<K, V>>.subscribeTo(
name: String,
dispatcher: CoroutineDispatcher = IO,
listener: ConsumerRebalanceListener = NoOpConsumerRebalanceListener(),
listener: ConsumerRebalanceListener = NoOpConsumerRebalanceListener,
timeout: kotlin.time.Duration = 500.milliseconds,
): Flow<ConsumerRecord<K, V>> = flatMapConcat { consumer ->
consumer.subscribeTo(name, dispatcher, listener, timeout)
Expand All @@ -212,7 +211,7 @@ public fun <K, V> Flow<KafkaConsumer<K, V>>.subscribeTo(
public fun <K, V> KafkaConsumer<K, V>.subscribeTo(
name: String,
dispatcher: CoroutineDispatcher = IO,
listener: ConsumerRebalanceListener = NoOpConsumerRebalanceListener(),
listener: ConsumerRebalanceListener = NoOpConsumerRebalanceListener,
timeout: kotlin.time.Duration = 500.milliseconds,
): Flow<ConsumerRecord<K, V>> = flow {
subscribe(listOf(name), listener)
Expand Down Expand Up @@ -365,3 +364,8 @@ public data class ConsumerSettings<K, V>(
put(EXCLUDE_INTERNAL_TOPICS_CONFIG, excludeInternalTopics)
}
}

private object NoOpConsumerRebalanceListener : ConsumerRebalanceListener {
override fun onPartitionsRevoked(partitions: MutableCollection<TopicPartition>?) {}
override fun onPartitionsAssigned(partitions: MutableCollection<TopicPartition>?) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public data class PublisherSettings<Key, Value>(
val bootstrapServers: String,
val keySerializer: Serializer<Key>,
val valueSerializer: Serializer<Value>,
val acknowledgments: Acks = Acks.One,
val acknowledgments: Acks = Acks.All,
val closeTimeout: Duration = Duration.INFINITE,
val isFatal: (t: Throwable) -> Boolean =
{ it is AuthenticationException || it is ProducerFencedException },
Expand Down
32 changes: 20 additions & 12 deletions src/test/kotlin/io/github/nomisrev/kafka/KafkaSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.common.Metric
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.PartitionInfo
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.jupiter.api.AfterAll
Expand All @@ -49,6 +50,8 @@ import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.seconds

private val testIterations: Int =
System.getProperties().getProperty("io.github.nomisrev.kafka.TEST_ITERATIONS")?.toIntOrNull() ?: 1

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
abstract class KafkaSpec {
Expand All @@ -62,7 +65,7 @@ abstract class KafkaSpec {
fun destroy() {
kafka.stop()
}

@BeforeAll
@JvmStatic
fun setup() {
Expand All @@ -85,10 +88,10 @@ abstract class KafkaSpec {
withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "true")
withReuse(true)
}

fun KafkaReceiver(): KafkaReceiver<String, String> =
KafkaReceiver(receiverSetting())

fun receiverSetting(): ReceiverSettings<String, String> =
ReceiverSettings(
bootstrapServers = kafka.bootstrapServers,
Expand Down Expand Up @@ -168,15 +171,17 @@ abstract class KafkaSpec {
partitions: Int = 4,
replicationFactor: Short = 1,
test: suspend TopicTestScope.(NewTopic) -> Unit
): Unit = runTest {
val topic = NewTopic(nextTopicName(), partitions, replicationFactor).configs(topicConfig)
admin {
createTopic(topic)
try {
TopicTestScope(topic, this@runTest).test(topic)
} finally {
topic.shouldBeEmpty()
deleteTopic(topic.name())
): Unit = repeat(testIterations) {
runTest {
val topic = NewTopic(nextTopicName(), partitions, replicationFactor).configs(topicConfig)
admin {
createTopic(topic)
try {
TopicTestScope(topic, this@runTest).test(topic)
} finally {
topic.shouldBeEmpty()
deleteTopic(topic.name())
}
}
}
}
Expand Down Expand Up @@ -295,6 +300,9 @@ abstract class KafkaSpec {
{
val producer = KafkaProducer(it.properties(), it.keySerializer, it.valueSerializer)
object : Producer<String, String> {
override fun clientInstanceId(p0: Duration?): Uuid =
producer.clientInstanceId(p0)

override fun close() {}

override fun close(timeout: Duration?) {}
Expand Down

0 comments on commit d42e9bf

Please sign in to comment.