Skip to content

Commit

Permalink
Add generic map of properties to Producer/Consumer config (#45)
Browse files Browse the repository at this point in the history
(cherry picked from commit 78b90c4)
  • Loading branch information
cakper authored and Avasil committed Nov 8, 2018
1 parent 7c3b43d commit 25788fd
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 49 deletions.
5 changes: 4 additions & 1 deletion AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ Alex Gryzlov
https://github.com/clayrat

Piotr Gawryś
https://github.com/Avasil
https://github.com/Avasil

Kacper Gunia
https://github.com/cakper
13 changes: 10 additions & 3 deletions kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ import scala.concurrent.duration._
* @param observableCommitOrder is the `monix.observable.commit.order` setting.
* Specifies when the commit should happen, like before we receive the
* acknowledgement from downstream, or afterwards.
*
* @param properties map of other properties that will be passed to
* the underlying kafka client. Any properties not explicitly handled
* by this object can be set via the map, but in case of a duplicate
* a value set on the case class will overwrite value set via properties.
*/
final case class KafkaConsumerConfig(
bootstrapServers: List[String],
Expand Down Expand Up @@ -237,9 +242,10 @@ final case class KafkaConsumerConfig(
retryBackoffTime: FiniteDuration,
observableCommitType: ObservableCommitType,
observableCommitOrder: ObservableCommitOrder,
observableSeekToEndOnStart: Boolean) {
observableSeekToEndOnStart: Boolean,
properties: Map[String, String]) {

def toMap: Map[String,String] = Map(
def toMap: Map[String, String] = properties ++ Map(
"bootstrap.servers" -> bootstrapServers.mkString(","),
"fetch.min.bytes" -> fetchMinBytes.toString,
"group.id" -> groupId,
Expand Down Expand Up @@ -415,7 +421,8 @@ object KafkaConsumerConfig {
retryBackoffTime = config.getInt("retry.backoff.ms").millis,
observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")),
observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")),
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart")
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"),
properties = Map.empty
)
}
}
27 changes: 18 additions & 9 deletions kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package monix.kafka

import java.io.File
import java.util.Properties

import com.typesafe.config.{Config, ConfigFactory}
import monix.kafka.config._

import scala.concurrent.duration._

/** The Kafka Producer config.
Expand Down Expand Up @@ -179,6 +181,11 @@ import scala.concurrent.duration._
* @param monixSinkParallelism is the `monix.producer.sink.parallelism`
* setting indicating how many requests the [[KafkaProducerSink]]
* can execute in parallel.
*
* @param properties map of other properties that will be passed to
* the underlying kafka client. Any properties not explicitly handled
* by this object can be set via the map, but in case of a duplicate
* a value set on the case class will overwrite value set via properties.
*/
case class KafkaProducerConfig(
bootstrapServers: List[String],
Expand Down Expand Up @@ -215,15 +222,10 @@ case class KafkaProducerConfig(
metricReporters: List[String],
metricsNumSamples: Int,
metricsSampleWindow: FiniteDuration,
monixSinkParallelism: Int) {
monixSinkParallelism: Int,
properties: Map[String, String]) {

def toProperties: Properties = {
val props = new Properties()
for ((k,v) <- toMap; if v != null) props.put(k,v)
props
}

def toMap: Map[String,String] = Map(
def toMap: Map[String, String] = properties ++ Map(
"bootstrap.servers" -> bootstrapServers.mkString(","),
"acks" -> acks.id,
"buffer.memory" -> bufferMemoryInBytes.toString,
Expand Down Expand Up @@ -259,6 +261,12 @@ case class KafkaProducerConfig(
"metrics.num.samples" -> metricsNumSamples.toString,
"metrics.sample.window.ms" -> metricsSampleWindow.toMillis.toString
)

def toProperties: Properties = {
val props = new Properties()
for ((k,v) <- toMap; if v != null) props.put(k,v)
props
}
}

object KafkaProducerConfig {
Expand Down Expand Up @@ -385,7 +393,8 @@ object KafkaProducerConfig {
metricReporters = config.getString("metric.reporters").trim.split("\\s*,\\s*").toList,
metricsNumSamples = config.getInt("metrics.num.samples"),
metricsSampleWindow = config.getInt("metrics.sample.window.ms").millis,
monixSinkParallelism = config.getInt("monix.producer.sink.parallelism")
monixSinkParallelism = config.getInt("monix.producer.sink.parallelism"),
properties = Map.empty
)
}
}
27 changes: 27 additions & 0 deletions kafka-0.10.x/src/test/scala/monix/kafka/ConfigTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package monix.kafka

import org.scalatest.FunSuite

class ConfigTest extends FunSuite {
test("overwrite properties with values from producer config") {
val config =
KafkaProducerConfig.default.copy(
bootstrapServers = List("localhost:9092"),
properties = Map("bootstrap.servers" -> "127.0.0.1:9092"))

assert(
config.toProperties.getProperty("bootstrap.servers") == "localhost:9092"
)
}

test("overwrite properties with values from consumer config") {
val config =
KafkaConsumerConfig.default.copy(
bootstrapServers = List("localhost:9092"),
properties = Map("bootstrap.servers" -> "127.0.0.1:9092"))

assert(
config.toProperties.getProperty("bootstrap.servers") == "localhost:9092"
)
}
}
13 changes: 10 additions & 3 deletions kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ import scala.concurrent.duration._
* @param observableCommitOrder is the `monix.observable.commit.order` setting.
* Specifies when the commit should happen, like before we receive the
* acknowledgement from downstream, or afterwards.
*
* @param properties map of other properties that will be passed to
* the underlying kafka client. Any properties not explicitly handled
* by this object can be set via the map, but in case of a duplicate
* a value set on the case class will overwrite value set via properties.
*/
final case class KafkaConsumerConfig(
bootstrapServers: List[String],
Expand Down Expand Up @@ -237,9 +242,10 @@ final case class KafkaConsumerConfig(
retryBackoffTime: FiniteDuration,
observableCommitType: ObservableCommitType,
observableCommitOrder: ObservableCommitOrder,
observableSeekToEndOnStart: Boolean) {
observableSeekToEndOnStart: Boolean,
properties: Map[String, String]) {

def toMap: Map[String,String] = Map(
def toMap: Map[String, String] = properties ++ Map(
"bootstrap.servers" -> bootstrapServers.mkString(","),
"fetch.min.bytes" -> fetchMinBytes.toString,
"group.id" -> groupId,
Expand Down Expand Up @@ -415,7 +421,8 @@ object KafkaConsumerConfig {
retryBackoffTime = config.getInt("retry.backoff.ms").millis,
observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")),
observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")),
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart")
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"),
properties = Map.empty
)
}
}
27 changes: 18 additions & 9 deletions kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package monix.kafka

import java.io.File
import java.util.Properties

import com.typesafe.config.{Config, ConfigFactory}
import monix.kafka.config._

import scala.concurrent.duration._

/** The Kafka Producer config.
Expand Down Expand Up @@ -179,6 +181,11 @@ import scala.concurrent.duration._
* @param monixSinkParallelism is the `monix.producer.sink.parallelism`
* setting indicating how many requests the [[KafkaProducerSink]]
* can execute in parallel.
*
* @param properties map of other properties that will be passed to
* the underlying kafka client. Any properties not explicitly handled
* by this object can be set via the map, but in case of a duplicate
* a value set on the case class will overwrite value set via properties.
*/
case class KafkaProducerConfig(
bootstrapServers: List[String],
Expand Down Expand Up @@ -215,15 +222,10 @@ case class KafkaProducerConfig(
metricReporters: List[String],
metricsNumSamples: Int,
metricsSampleWindow: FiniteDuration,
monixSinkParallelism: Int) {
monixSinkParallelism: Int,
properties: Map[String, String]) {

def toProperties: Properties = {
val props = new Properties()
for ((k,v) <- toMap; if v != null) props.put(k,v)
props
}

def toMap: Map[String,String] = Map(
def toMap: Map[String, String] = properties ++ Map(
"bootstrap.servers" -> bootstrapServers.mkString(","),
"acks" -> acks.id,
"buffer.memory" -> bufferMemoryInBytes.toString,
Expand Down Expand Up @@ -259,6 +261,12 @@ case class KafkaProducerConfig(
"metrics.num.samples" -> metricsNumSamples.toString,
"metrics.sample.window.ms" -> metricsSampleWindow.toMillis.toString
)

def toProperties: Properties = {
val props = new Properties()
for ((k,v) <- toMap; if v != null) props.put(k,v)
props
}
}

object KafkaProducerConfig {
Expand Down Expand Up @@ -385,7 +393,8 @@ object KafkaProducerConfig {
metricReporters = config.getString("metric.reporters").trim.split("\\s*,\\s*").toList,
metricsNumSamples = config.getInt("metrics.num.samples"),
metricsSampleWindow = config.getInt("metrics.sample.window.ms").millis,
monixSinkParallelism = config.getInt("monix.producer.sink.parallelism")
monixSinkParallelism = config.getInt("monix.producer.sink.parallelism"),
properties = Map.empty
)
}
}
27 changes: 27 additions & 0 deletions kafka-0.11.x/src/test/scala/monix/kafka/ConfigTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package monix.kafka

import org.scalatest.FunSuite

class ConfigTest extends FunSuite {
test("overwrite properties with values from producer config") {
val config =
KafkaProducerConfig.default.copy(
bootstrapServers = List("localhost:9092"),
properties = Map("bootstrap.servers" -> "127.0.0.1:9092"))

assert(
config.toProperties.getProperty("bootstrap.servers") == "localhost:9092"
)
}

test("overwrite properties with values from consumer config") {
val config =
KafkaConsumerConfig.default.copy(
bootstrapServers = List("localhost:9092"),
properties = Map("bootstrap.servers" -> "127.0.0.1:9092"))

assert(
config.toProperties.getProperty("bootstrap.servers") == "localhost:9092"
)
}
}
13 changes: 10 additions & 3 deletions kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ import scala.concurrent.duration._
* @param observableCommitOrder is the `monix.observable.commit.order` setting.
* Specifies when the commit should happen, like before we receive the
* acknowledgement from downstream, or afterwards.
*
* @param properties map of other properties that will be passed to
* the underlying kafka client. Any properties not explicitly handled
* by this object can be set via the map, but in case of a duplicate
* a value set on the case class will overwrite value set via properties.
*/
final case class KafkaConsumerConfig(
bootstrapServers: List[String],
Expand Down Expand Up @@ -231,9 +236,10 @@ final case class KafkaConsumerConfig(
retryBackoffTime: FiniteDuration,
observableCommitType: ObservableCommitType,
observableCommitOrder: ObservableCommitOrder,
observableSeekToEndOnStart: Boolean) {
observableSeekToEndOnStart: Boolean,
properties: Map[String, String]) {

def toMap: Map[String,String] = Map(
def toMap: Map[String, String] = properties ++ Map(
"bootstrap.servers" -> bootstrapServers.mkString(","),
"fetch.min.bytes" -> fetchMinBytes.toString,
"group.id" -> groupId,
Expand Down Expand Up @@ -405,7 +411,8 @@ object KafkaConsumerConfig {
retryBackoffTime = config.getInt("retry.backoff.ms").millis,
observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")),
observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")),
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart")
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"),
properties = Map.empty
)
}
}
27 changes: 18 additions & 9 deletions kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package monix.kafka

import java.io.File
import java.util.Properties

import com.typesafe.config.{Config, ConfigFactory}
import monix.kafka.config._

import scala.concurrent.duration._

/** The Kafka Producer config.
Expand Down Expand Up @@ -175,6 +177,11 @@ import scala.concurrent.duration._
* @param monixSinkParallelism is the `monix.producer.sink.parallelism`
* setting indicating how many requests the [[KafkaProducerSink]]
* can execute in parallel.
*
* @param properties map of other properties that will be passed to
* the underlying kafka client. Any properties not explicitly handled
* by this object can be set via the map, but in case of a duplicate
* a value set on the case class will overwrite value set via properties.
*/
case class KafkaProducerConfig(
bootstrapServers: List[String],
Expand Down Expand Up @@ -210,15 +217,10 @@ case class KafkaProducerConfig(
metricReporters: List[String],
metricsNumSamples: Int,
metricsSampleWindow: FiniteDuration,
monixSinkParallelism: Int) {
monixSinkParallelism: Int,
properties: Map[String, String]) {

def toProperties: Properties = {
val props = new Properties()
for ((k,v) <- toMap; if v != null) props.put(k,v)
props
}

def toMap: Map[String,String] = Map(
def toMap: Map[String, String] = properties ++ Map(
"bootstrap.servers" -> bootstrapServers.mkString(","),
"acks" -> acks.id,
"buffer.memory" -> bufferMemoryInBytes.toString,
Expand Down Expand Up @@ -253,6 +255,12 @@ case class KafkaProducerConfig(
"metrics.num.samples" -> metricsNumSamples.toString,
"metrics.sample.window.ms" -> metricsSampleWindow.toMillis.toString
)

def toProperties: Properties = {
val props = new Properties()
for ((k,v) <- toMap; if v != null) props.put(k,v)
props
}
}

object KafkaProducerConfig {
Expand Down Expand Up @@ -378,7 +386,8 @@ object KafkaProducerConfig {
metricReporters = config.getString("metric.reporters").trim.split("\\s*,\\s*").toList,
metricsNumSamples = config.getInt("metrics.num.samples"),
metricsSampleWindow = config.getInt("metrics.sample.window.ms").millis,
monixSinkParallelism = config.getInt("monix.producer.sink.parallelism")
monixSinkParallelism = config.getInt("monix.producer.sink.parallelism"),
properties = Map.empty
)
}
}
27 changes: 27 additions & 0 deletions kafka-0.9.x/src/test/scala/monix/kafka/ConfigTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package monix.kafka

import org.scalatest.FunSuite

class ConfigTest extends FunSuite {
test("overwrite properties with values from producer config") {
val config =
KafkaProducerConfig.default.copy(
bootstrapServers = List("localhost:9092"),
properties = Map("bootstrap.servers" -> "127.0.0.1:9092"))

assert(
config.toProperties.getProperty("bootstrap.servers") == "localhost:9092"
)
}

test("overwrite properties with values from consumer config") {
val config =
KafkaConsumerConfig.default.copy(
bootstrapServers = List("localhost:9092"),
properties = Map("bootstrap.servers" -> "127.0.0.1:9092"))

assert(
config.toProperties.getProperty("bootstrap.servers") == "localhost:9092"
)
}
}
Loading

0 comments on commit 25788fd

Please sign in to comment.