Skip to content

Commit

Permalink
Update kafka-clients to 3.6.0 in series/3.x (#1263)
Browse files Browse the repository at this point in the history
* Update kafka-clients to 3.6.0 in series/3.x

* `InvalidProducerEpochException` fixed to `ProducerFencedException`

---------

Co-authored-by: a.artigao <[email protected]>
  • Loading branch information
scala-steward and aartigao authored Oct 21, 2023
1 parent 9572ca0 commit 4f51f27
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 6 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ val confluentVersion = "7.5.1"

val fs2Version = "3.9.2"

val kafkaVersion = "3.5.1"
val kafkaVersion = "3.6.0"

val testcontainersScalaVersion = "0.41.0"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import fs2.kafka.internal.converters.collection._
import fs2.kafka.producer.MkProducer
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerGroupMetadata, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidProducerEpochException
import org.apache.kafka.common.errors.ProducerFencedException
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.scalatest.EitherValues
import scala.concurrent.duration._
Expand Down Expand Up @@ -418,9 +418,6 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
}
}

// TODO: after switching from ForEachTestContainer to ForAllTestContainer, this fails
// if run with a shared container with the following error:
// org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. was not an instance of org.apache.kafka.common.errors.InvalidProducerEpochException, but an instance of org.apache.kafka.common.errors.ProducerFencedException
class TransactionalKafkaProducerTimeoutSpec extends BaseKafkaSpec with EitherValues {
it("should use user-specified transaction timeouts") {
withTopic { topic =>
Expand Down Expand Up @@ -466,7 +463,7 @@ class TransactionalKafkaProducerTimeoutSpec extends BaseKafkaSpec with EitherVal
result <- Stream.eval(producer.produce(records).attempt)
} yield result).compile.lastOrError.unsafeRunSync()

produced.left.value shouldBe an[InvalidProducerEpochException]
produced.left.value shouldBe an[ProducerFencedException]

val consumedOrError = {
Either.catchNonFatal(
Expand Down

0 comments on commit 4f51f27

Please sign in to comment.