Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ db0ea0ffd3d0956211a680941426f73ba7ec581b

# Scala Steward: Reformat with scalafmt 3.8.6
acac8aa54dcf2d38b236d78703691cc9a4f594c5

# Scala Steward: Reformat with scalafmt 3.9.9
1fd9d1bac63fc0ae488e93b9112dc0ae61177fd4
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "3.9.6"
version = "3.9.9"

# Scala 2 with -Xsource:3 compiler option
runner.dialect = scala213source3
Expand Down
20 changes: 10 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ lazy val docs = project
.enablePlugins(BuildInfoPlugin, DocusaurusPlugin, MdocPlugin, ScalaUnidocPlugin)

lazy val dependencySettings = Seq(
resolvers += "confluent".at("https://packages.confluent.io/maven/"),
resolvers += "confluent".at("https://packages.confluent.io/maven/"),
libraryDependencies ++= Seq(
"com.dimafeng" %% "testcontainers-scala-scalatest" % testcontainersScalaVersion,
"com.dimafeng" %% "testcontainers-scala-kafka" % testcontainersScalaVersion,
Expand Down Expand Up @@ -146,9 +146,9 @@ lazy val mdocSettings = Seq(
scalacOptions --= Seq("-Xfatal-warnings", "-Ywarn-unused"),
crossScalaVersions := Seq(scala213),
ScalaUnidoc / unidoc / unidocProjectFilter := inProjects(core, vulcan),
ScalaUnidoc / unidoc / target := (LocalRootProject / baseDirectory)
ScalaUnidoc / unidoc / target := (LocalRootProject / baseDirectory)
.value / "website" / "static" / "api",
cleanFiles += (ScalaUnidoc / unidoc / target).value,
cleanFiles += (ScalaUnidoc / unidoc / target).value,
docusaurusCreateSite := docusaurusCreateSite
.dependsOn(Compile / unidoc)
.dependsOn(ThisBuild / updateSiteVariables)
Expand All @@ -171,7 +171,7 @@ lazy val mdocSettings = Seq(
lazy val buildInfoSettings = Seq(
buildInfoPackage := "fs2.kafka.build",
buildInfoObject := "info",
buildInfoKeys := Seq[BuildInfoKey](
buildInfoKeys := Seq[BuildInfoKey](
scalaVersion,
scalacOptions,
sourceDirectory,
Expand Down Expand Up @@ -244,7 +244,7 @@ lazy val publishSettings =
homepage := Some(url("https://fd4s.github.io/fs2-kafka")),
licenses := List("Apache-2.0" -> url("https://www.apache.org/licenses/LICENSE-2.0.txt")),
startYear := Some(2018),
headerLicense := Some(
headerLicense := Some(
de.heikoseeberger
.sbtheader
.License
Expand All @@ -255,7 +255,7 @@ lazy val publishSettings =
)
),
headerSources / excludeFilter := HiddenFileFilter,
developers := List(
developers := List(
tlGitHubDev("vlovgr", "Viktor LΓΆvgren")
.withEmail("[email protected]")
.withUrl(url("https://vlovgr.se")),
Expand Down Expand Up @@ -341,7 +341,7 @@ lazy val scalaSettings = Seq(
Compile / compile / scalacOptions ++= {
if (tlIsScala3.value) Seq.empty else Seq("-Xsource:3")
},
Test / console / scalacOptions := (Compile / console / scalacOptions).value,
Test / console / scalacOptions := (Compile / console / scalacOptions).value,
Compile / unmanagedSourceDirectories ++=
Seq(
baseDirectory.value / "src" / "main" / {
Expand Down Expand Up @@ -379,9 +379,9 @@ ThisBuild / updateSiteVariables := {

val variables =
Map[String, String](
"organization" -> (LocalRootProject / organization).value,
"coreModuleName" -> (core / moduleName).value,
"latestVersion" -> latestVersion.value,
"organization" -> (LocalRootProject / organization).value,
"coreModuleName" -> (core / moduleName).value,
"latestVersion" -> latestVersion.value,
"scalaPublishVersions" -> {
val minorVersions = (core / crossScalaVersions).value.map(minorVersion)
if (minorVersions.size <= 2) minorVersions.mkString(" and ")
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/mdoc/admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def consumerGroupOperations[F[_]: Async: cats.Parallel]: F[Unit] =
for {
consumerGroupIds <- client.listConsumerGroups.groupIds
_ <- client.describeConsumerGroups(consumerGroupIds)
_ <- consumerGroupIds.parTraverse { groupId =>
_ <- consumerGroupIds.parTraverse { groupId =>
client.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata
}
} yield ()
Expand Down
78 changes: 41 additions & 37 deletions docs/src/main/mdoc/consumers.md
Original file line number Diff line number Diff line change
Expand Up @@ -417,41 +417,43 @@ object WithGracefulShutdownExampleCE2 extends IOApp.Simple {
for {
stoppedDeferred <- Deferred[IO, Either[Throwable, Unit]] // [1]
gracefulShutdownStartedRef <- Ref[IO].of(false) // [2]
_ <- KafkaConsumer
_ <- KafkaConsumer
.resource(consumerSettings)
.allocated
.bracketCase { case (consumer, _) => // [3]
run(consumer)
.attempt
.flatMap { result: Either[Throwable, Unit] => // [4]
gracefulShutdownStartedRef
.get
.flatMap {
case true => stoppedDeferred.complete(result) // [5]
case false => IO.pure(result).rethrow // [6]
}
}
.uncancelable // [7]
} { case ((consumer, closeConsumer), exitCase) => // [8]
(exitCase match {
case Outcome.Errored(e) => handleError(e) // [9]
case _ =>
for {
_ <- gracefulShutdownStartedRef.set(true) // [10]
_ <- consumer.stopConsuming // [11]
stopResult <-
stoppedDeferred
.get // [12]
.timeoutTo(
10.seconds,
IO.pure(Left(new RuntimeException("Graceful shutdown timed out")))
) // [13]
_ <- stopResult match { // [14]
case Right(()) => IO.unit
case Left(e) => handleError(e)
}
} yield ()
}).guarantee(closeConsumer) // [15]
.bracketCase {
case (consumer, _) => // [3]
run(consumer)
.attempt
.flatMap { result: Either[Throwable, Unit] => // [4]
gracefulShutdownStartedRef
.get
.flatMap {
case true => stoppedDeferred.complete(result) // [5]
case false => IO.pure(result).rethrow // [6]
}
}
.uncancelable // [7]
} {
case ((consumer, closeConsumer), exitCase) => // [8]
(exitCase match {
case Outcome.Errored(e) => handleError(e) // [9]
case _ =>
for {
_ <- gracefulShutdownStartedRef.set(true) // [10]
_ <- consumer.stopConsuming // [11]
stopResult <-
stoppedDeferred
.get // [12]
.timeoutTo(
10.seconds,
IO.pure(Left(new RuntimeException("Graceful shutdown timed out")))
) // [13]
_ <- stopResult match { // [14]
case Right(()) => IO.unit
case Left(e) => handleError(e)
}
} yield ()
}).guarantee(closeConsumer) // [15]
}
} yield ()
}
Expand Down Expand Up @@ -500,11 +502,11 @@ object WithGracefulShutdownExampleCE3 extends IOApp.Simple {
.use { consumer => // [1]
IO.uncancelable { poll => // [2]
for {
runFiber <- run(consumer).start // [3]
_ <- poll(runFiber.join).onCancel { // [4]
runFiber <- run(consumer).start // [3]
_ <- poll(runFiber.join).onCancel { // [4]
for {
_ <- IO(println("Starting graceful shutdown"))
_ <- consumer.stopConsuming // [5]
_ <- IO(println("Starting graceful shutdown"))
_ <- consumer.stopConsuming // [5]
shutdownOutcome <-
runFiber
.join
Expand Down Expand Up @@ -554,6 +556,7 @@ to finish processing partition before "releasing" it. Behavior can be enabled vi

```scala mdoc:silent
object WithGracefulPartitionRevoke extends IOApp.Simple {

val run: IO[Unit] = {
def processRecord(record: CommittableConsumerRecord[IO, String, String]): IO[Unit] =
IO(println(s"Processing record: $record"))
Expand All @@ -580,6 +583,7 @@ object WithGracefulPartitionRevoke extends IOApp.Simple {
run(consumer)
}
}

}
```

Expand Down
6 changes: 3 additions & 3 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,9 @@ object KafkaConsumer {
case None =>
for {
partitionsMapQueue <- Stream.eval(Queue.unbounded[F, Option[PartitionsMap]])
assignmentRef <-
assignmentRef <-
Stream.eval(Ref[F].of(Map.empty[TopicPartition, AssignmentSignals[F]]))
_ <- Stream.eval(initialEnqueue(assignmentRef, partitionsMapQueue))
_ <- Stream.eval(initialEnqueue(assignmentRef, partitionsMapQueue))
out <- Stream
.fromQueueNoneTerminated(partitionsMapQueue)
.interruptWhen(awaitTermination.attempt)
Expand Down Expand Up @@ -598,7 +598,7 @@ object KafkaConsumer {
dispatcher <- Dispatcher.sequential[F]
stopConsumingDeferred <- Resource.eval(Deferred[F, Unit])
withConsumer <- WithConsumer(mk, settings)
actor = {
actor = {
implicit val jitter0: Jitter[F] = jitter
implicit val logging0: Logging[F] = logging
implicit val dispatcher0: Dispatcher[F] = dispatcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V](
.get
.flatMap { state =>
(partitions -- state.partitionState.keys).toList match {
case Nil => F.pure(state.partitionState)
case Nil => F.pure(state.partitionState)
case missing =>
missing
.traverse { partition =>
Expand All @@ -303,7 +303,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V](
ensurePartitionStateFor(Set(partition)).flatMap { partitionState =>
partitionState.get(partition) match {
case Some(ps) => F.pure((ps.queue, ps.closeSignal.get))
case None =>
case None =>
F.raiseError(new IllegalStateException(s"PartitionState not added for $partition"))
}
}
Expand Down
2 changes: 1 addition & 1 deletion modules/core/src/test/scala/fs2/kafka/BaseGenerators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ trait BaseGenerators {
}

val genTimestamp: Gen[Timestamp] = for {
long <- Gen.choose(1L, Long.MaxValue)
long <- Gen.choose(1L, Long.MaxValue)
timestamp <- Gen.oneOf(
Seq(
Timestamp.createTime(long),
Expand Down
2 changes: 1 addition & 1 deletion modules/core/src/test/scala/fs2/kafka/BaseKafkaSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ abstract class BaseKafkaSpec extends BaseAsyncSpec with ForAllTestContainer {
val consumerProperties = defaultConsumerProperties ++ customProperties

var timeoutNanoTime = System.nanoTime + timeout.toNanos
val consumer = new KConsumer[K, V](
val consumer = new KConsumer[K, V](
consumerProperties.asJava,
keyDeserializer,
valueDeserializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ final class CommittableOffsetSpec extends BaseSpec {

assert {
val offsetAndMetadata = new OffsetAndMetadata(0L, "metadata")
val offset =
val offset =
CommittableOffset[IO](partition, offsetAndMetadata, Some("the-group"), _ => IO.unit)

offset.toString == "CommittableOffset(topic-0 -> (0, metadata), the-group)" &&
Expand All @@ -60,7 +60,7 @@ final class CommittableOffsetSpec extends BaseSpec {

assert {
val offsetAndMetadata = new OffsetAndMetadata(0L)
val offset =
val offset =
CommittableOffset[IO](partition, offsetAndMetadata, Some("the-group"), _ => IO.unit)

offset.toString == "CommittableOffset(topic-0 -> 0, the-group)" &&
Expand Down
44 changes: 22 additions & 22 deletions modules/core/src/test/scala/fs2/kafka/KafkaAdminClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {
.use { adminClient =>
for {
consumerGroupIds <- adminClient.listConsumerGroups.groupIds
consumerGroupId <- IO(consumerGroupIds match {
consumerGroupId <- IO(consumerGroupIds match {
case List(groupId) => groupId
case _ => fail()
})
consumerGroupListings <- adminClient.listConsumerGroups.listings
_ <- IO(assert(consumerGroupListings.size == 1))
describedConsumerGroups <- adminClient.describeConsumerGroups(consumerGroupIds)
_ <- IO(assert(describedConsumerGroups.size == 1))
_ <- IO {
_ <- IO {
adminClient.listConsumerGroups.toString should
startWith("ListConsumerGroups$")
}
Expand All @@ -64,7 +64,7 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {
case _ => fail()
})
(_, consumerGroupOffsetsMap) = offsets
_ <- IO(
_ <- IO(
assert(
consumerGroupOffsetsMap
.map { case (_, offset) =>
Expand Down Expand Up @@ -95,7 +95,7 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {
}
partition0 = new TopicPartition(topic, 0)
updatedOffset = new OffsetAndMetadata(0)
_ <- adminClient.alterConsumerGroupOffsets(
_ <- adminClient.alterConsumerGroupOffsets(
consumerGroupId,
Map(partition0 -> updatedOffset)
)
Expand Down Expand Up @@ -144,7 +144,7 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {
_ <- IO(assert(!clusterController.isEmpty))
clusterId <- adminClient.describeCluster.clusterId
_ <- IO(assert(clusterId.nonEmpty))
_ <- IO {
_ <- IO {
adminClient.describeCluster.toString should startWith("DescribeCluster$")
}
} yield ()
Expand All @@ -161,16 +161,16 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {
.resource[IO](adminClientSettings)
.use { adminClient =>
for {
cr <- IO.pure(new ConfigResource(ConfigResource.Type.TOPIC, topic))
ce = new ConfigEntry("cleanup.policy", "delete")
cr <- IO.pure(new ConfigResource(ConfigResource.Type.TOPIC, topic))
ce = new ConfigEntry("cleanup.policy", "delete")
alteredConfigs <- adminClient
.alterConfigs {
Map(cr -> List(new AlterConfigOp(ce, AlterConfigOp.OpType.SET)))
}
.attempt
_ <- IO(assert(alteredConfigs.isRight))
describedConfigs <- adminClient.describeConfigs(List(cr))
_ <- IO(
_ <- IO(
assert(
describedConfigs(cr).exists(actual =>
actual.name == ce.name && actual.value == ce.value
Expand Down Expand Up @@ -203,35 +203,35 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {
_ <- IO(assert(topicListingsInternal.size == topicCount + 1))
topicNamesToListingsInternal <- adminClient.listTopics.includeInternal.namesToListings
_ <- IO(assert(topicNamesToListingsInternal.size == topicCount + 1))
_ <- IO {
_ <- IO {
adminClient.listTopics.toString should startWith("ListTopics$")
}
_ <- IO {
adminClient.listTopics.includeInternal.toString should
startWith("ListTopicsIncludeInternal$")
}
describedTopics <- adminClient.describeTopics(topicNames.toList)
_ <- IO(assert(describedTopics.size == topicCount))
newTopic = new NewTopic("new-test-topic", 1, 1.toShort)
preCreateNames <- adminClient.listTopics.names
_ <- IO(assert(!preCreateNames.contains(newTopic.name)))
_ <- adminClient.createTopic(newTopic)
postCreateNames <- adminClient.listTopics.names
createAgain <- adminClient.createTopics(List(newTopic)).attempt
_ <- IO(assert(createAgain.isLeft))
_ <- IO(assert(postCreateNames.contains(newTopic.name)))
describedTopics <- adminClient.describeTopics(topicNames.toList)
_ <- IO(assert(describedTopics.size == topicCount))
newTopic = new NewTopic("new-test-topic", 1, 1.toShort)
preCreateNames <- adminClient.listTopics.names
_ <- IO(assert(!preCreateNames.contains(newTopic.name)))
_ <- adminClient.createTopic(newTopic)
postCreateNames <- adminClient.listTopics.names
createAgain <- adminClient.createTopics(List(newTopic)).attempt
_ <- IO(assert(createAgain.isLeft))
_ <- IO(assert(postCreateNames.contains(newTopic.name)))
createPartitions <-
adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(4))).attempt
_ <- IO(assert(createPartitions.isRight))
describedTopics <- adminClient.describeTopics(topic :: Nil)
_ <- IO(assert(describedTopics.size == 1))
_ <- IO(
_ <- IO(
assert(describedTopics.headOption.exists(_._2.partitions.size == 4))
)
deleteTopics <- adminClient.deleteTopics(List(topic)).attempt
_ <- IO(assert(deleteTopics.isRight))
describedTopics <- adminClient.describeTopics(topic :: Nil).attempt
_ <- IO(
_ <- IO(
assert(
describedTopics.leftMap(_.getMessage()) == Left(
"This server does not host this topic-partition."
Expand All @@ -241,7 +241,7 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {
deleteTopic <- adminClient.deleteTopic(newTopic.name()).attempt
_ <- IO(assert(deleteTopic.isRight))
describedTopic <- adminClient.describeTopics(newTopic.name() :: Nil).attempt
_ <- IO(
_ <- IO(
assert(
describedTopic.leftMap(_.getMessage()) == Left(
"This server does not host this topic-partition."
Expand Down
Loading