Skip to content

Commit 5f269df

Browse files
author
Aliaksandr.Shpak
committed
- Remove group name parameter as not necessary
1 parent 46b63bd commit 5f269df

File tree

5 files changed

+30
-47
lines changed

5 files changed

+30
-47
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
version = 0.1.1
1+
version = 0.1.2
22
url = https://api.github.com/repos/alshpak/kafka_data_viewer/releases/latest
33
update_url = https://github.com/alshpak/kafka_data_viewer/releases

src/main/scala/devtools/kafka_data_viewer/AppSettings.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ object AppSettings {
4747
def withres[T <: AutoCloseable, R](t: T)(f: T => R): R = try f(t) finally t.close()
4848

4949

50-
def connect(defaultGroup: String): AppSettings = {
50+
def connect(): AppSettings = {
5151

52-
val $ = new DisposeStore()
52+
val $ = new DisposeStore()
5353

5454
val appPropsFile = new File("application.setting.yml")
5555
if (!appPropsFile.exists()) {
@@ -127,7 +127,6 @@ object AppSettings {
127127
root.listNodes[ConnectionDefinition]("connections", connections, ConnectionDefinition(), (item, node) => {
128128
node.property("name", item.name)
129129
node.property("kafkaHost", item.kafkaHost)
130-
node.property("group", item.group, defaultGroup)
131130
node.listStrings("avroRegistries", item.avroRegistries)
132131
node.listCustom("topicsSettings", item.topicSettings)
133132
})

src/main/scala/devtools/kafka_data_viewer/KafkaDataViewer.scala

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,21 @@ import devtools.lib.rxui.FxRender.DefaultFxRenderes
2020
import devtools.lib.rxui.UiImplicits._
2121
import devtools.lib.rxui._
2222
import io.reactivex.schedulers.Schedulers
23-
import org.apache.commons.cli.{DefaultParser, Options}
2423
import org.apache.kafka.common.errors.InterruptException
2524

2625
import scala.concurrent.{ExecutionContextExecutor, Future}
2726
import scala.language.postfixOps
2827

2928
class KafkaDataViewerAppPane(val layoutData: String = "",
3029
connections: BehaviorSubject[Seq[ConnectionDefinition]],
31-
filters: BehaviorSubject[Seq[FilterData]],
32-
defaultGroup: String
30+
filters: BehaviorSubject[Seq[FilterData]]
3331
)(implicit uiRenderer: UiRenderer) extends UiObservingComponent {
3432

3533
case class ConnectionsSet(logging: ConsumerConnection, read: ConsumerConnection, master: ConsumerConnection, producer: ProducerConnection)
3634

3735
private def connectToServices(connDef: ConnectionDefinition) = {
3836
val connector = new KafkaConnector(
39-
host = connDef.kafkaHost.value,
40-
groupName = connDef.group.value)
37+
host = connDef.kafkaHost.value)
4138

4239
ConnectionsSet(logging = connector.connectConsumer(), read = connector.connectConsumer(), master = connector.connectConsumer(), producer = connector.connectProducer())
4340
}
@@ -117,8 +114,7 @@ class KafkaDataViewerAppPane(val layoutData: String = "",
117114
UiTab(label = "Connections List",
118115
content = new KafkaConnectionsListPane(
119116
connections = connections,
120-
onConnect = onConnect,
121-
defaultGroup = defaultGroup
117+
onConnect = onConnect
122118
)))),
123119
UiTabPanelExt[ConnHandle](
124120
tabs = connectOps,
@@ -321,7 +317,6 @@ object KafkaDataViewer {
321317
case class ConnectionDefinition(
322318
name: BehaviorSubject[String] = behaviorSubject(""),
323319
kafkaHost: BehaviorSubject[String] = behaviorSubject(""),
324-
group: BehaviorSubject[String] = behaviorSubject(""),
325320
topicSettings: BehaviorSubject[Seq[(String, MessageType)]] = behaviorSubject(Seq()),
326321
avroRegistries: BehaviorSubject[Seq[String]] = behaviorSubject(Seq()))
327322

@@ -333,19 +328,18 @@ object KafkaDataViewer {
333328

334329
def main(args: Array[String]): Unit = {
335330

336-
val options = new Options()
337-
options.addOption("n", "groupname", true, "Group name to connect to kafka server")
338-
339-
val cli = new DefaultParser().parse(options, args)
340-
val groupName = cli.getOptionValue("n")
341-
342-
val defaultGroup = Option(groupName).getOrElse("local.connection")
343-
val appSettings = AppSettings.connect(defaultGroup = defaultGroup)
331+
// val options = new Options()
332+
// options.addOption("n", "groupname", true, "Group name to connect to kafka server")
333+
//
334+
// val cli = new DefaultParser().parse(options, args)
335+
// val groupName = cli.getOptionValue("n")
336+
//
337+
// val defaultGroup = Option(groupName).getOrElse("local.connection")
338+
val appSettings = AppSettings.connect()
344339

345340
DefaultFxRenderes.runApp(root = new KafkaDataViewerAppPane(
346341
connections = appSettings.connections,
347-
filters = appSettings.filters,
348-
defaultGroup = defaultGroup
342+
filters = appSettings.filters
349343
), postAction = uiRenderer => {
350344
implicit val ec: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global
351345
Future { AppUpdate.verify() }.onComplete(r => {

src/main/scala/devtools/kafka_data_viewer/kafkaconn/KafkaConnector.scala

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package devtools.kafka_data_viewer.kafkaconn
22

33
import java.time.Duration.ofSeconds
4-
import java.time.{Duration, Instant}
4+
import java.time.Instant
55
import java.util
66
import java.util.concurrent.Semaphore
77
import java.util.concurrent.atomic.AtomicInteger
@@ -13,29 +13,28 @@ import devtools.lib.rxui.DisposeStore
1313
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
1414
import org.apache.kafka.clients.producer.{KafkaProducer, Partitioner, ProducerConfig, ProducerRecord}
1515
import org.apache.kafka.common.errors.InterruptException
16-
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer}
16+
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
1717
import org.apache.kafka.common.{Cluster, PartitionInfo, TopicPartition}
1818

1919
import scala.collection.JavaConverters._
2020
import scala.collection.mutable
2121
import scala.collection.mutable.ArrayBuffer
2222
import scala.language.postfixOps
2323

24-
class KafkaConnector(host: String,
25-
groupName: String) extends Connector {
24+
class KafkaConnector(host: String) extends Connector {
2625

2726

2827
private val consumers = ArrayBuffer[ConsumerConnection]()
2928
private val producers = ArrayBuffer[ProducerConnection]()
3029

3130
override def connectConsumer(): ConsumerConnection = {
32-
val connection = new KafkaConsumerConnection(host, groupName)
31+
val connection = new KafkaConsumerConnection(host)
3332
consumers += connection
3433
connection
3534
}
3635

3736
override def connectProducer(): ProducerConnection = {
38-
val connection = new KafkaProducerConnection(host, groupName)
37+
val connection = new KafkaProducerConnection(host)
3938
producers += connection
4039
connection
4140
}
@@ -51,12 +50,10 @@ object Counter {
5150
val counter = new AtomicInteger(0)
5251
}
5352

54-
class KafkaConsumerConnection(host: String,
55-
groupName: String) extends ConsumerConnection {
53+
class KafkaConsumerConnection(host: String) extends ConsumerConnection {
5654

5755
private val consumerProps = Map(
5856
"bootstrap.servers" -> host
59-
, "group.id" -> groupName
6057
, "key.deserializer" -> classOf[ByteArrayDeserializer].getName
6158
, "value.deserializer" -> classOf[ByteArrayDeserializer].getName
6259
, "enable.auto.commit" -> "false"
@@ -140,8 +137,8 @@ class KafkaConsumerConnection(host: String,
140137
}
141138

142139
val nextRecords: Unit => Seq[GenRecord] = _ =>
143-
if (assignment.isEmpty) {try Thread.sleep(1000) catch { case e: InterruptedException => }; Seq() }
144-
else try consumer.poll(ofSeconds(1)).asScala.toSeq catch { case e: InterruptException => Seq() }
140+
if (assignment.isEmpty) {try Thread.sleep(1000) catch {case e: InterruptedException =>}; Seq() }
141+
else try consumer.poll(ofSeconds(1)).asScala.toSeq catch {case e: InterruptException => Seq()}
145142

146143

147144
for (records <- Stream.continually(reassign.andThen(nextRecords)(getNewTopics())).takeWhile(_ => !closed))
@@ -203,8 +200,7 @@ class KafkaConsumerConnection(host: String,
203200

204201
}
205202

206-
class KafkaProducerConnection(host: String,
207-
groupName: String) extends ProducerConnection {
203+
class KafkaProducerConnection(host: String) extends ProducerConnection {
208204

209205
private val producerProps = Map(
210206
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> host,
@@ -217,9 +213,9 @@ class KafkaProducerConnection(host: String,
217213

218214
override def send(topic: String, key: Array[Byte], value: Array[Byte], partition: PartitionerMode): Unit = {
219215
try {
220-
println("Try to send message!")
221-
CustomPartitioner.partitionMode = partition
222-
producer.send(new ProducerRecord(topic, key, value))
216+
println("Try to send message!")
217+
CustomPartitioner.partitionMode = partition
218+
producer.send(new ProducerRecord(topic, key, value))
223219
} catch {
224220
case e: Exception => e.printStackTrace()
225221
}

src/main/scala/devtools/kafka_data_viewer/ui/KafkaConnectionsListPane.scala

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ import scala.language.postfixOps
1111

1212
class KafkaConnectionsListPane(val layoutData: String = "",
1313
connections: BehaviorSubject[Seq[ConnectionDefinition]],
14-
onConnect: Subject[ConnectionDefinition],
15-
defaultGroup: String
14+
onConnect: Subject[ConnectionDefinition]
1615
)(implicit uiRenderer: UiRenderer) extends UiObservingComponent {
1716

1817
private val onAdd = publishSubject[Unit]()
@@ -24,7 +23,6 @@ class KafkaConnectionsListPane(val layoutData: String = "",
2423
val applyHandle = publishSubject[Unit]()
2524
val closeHandle = publishSubject[Unit]()
2625
val newConnection = ConnectionDefinition()
27-
newConnection.group << defaultGroup
2826
for (conn <- $(applyHandle)) connections << connections.value :+ newConnection
2927
uiRenderer.runModal(new ConfigureConnectionWindow("", newConnection, applyHandle, closeHandle), close = closeHandle)
3028
}
@@ -72,24 +70,20 @@ class ConfigureConnectionWindow(
7270
for (_ <- $(onOk)) {
7371
conn.name << name.value.trim
7472
conn.kafkaHost << host.value.trim
75-
conn.group << group.value.trim
7673
onApply onNext Unit
7774
onClose onNext Unit
7875
}
7976
private val name = behaviorSubject(conn.name.value)
8077
private val host = behaviorSubject[String](conn.kafkaHost.value)
81-
private val group = behaviorSubject[String](conn.group.value)
8278

83-
private val applyAllowed = Observable.combineLatest(name, host, group)
84-
.map(x => !x._1.trim.isEmpty && !x._2.trim.isEmpty && !x._3.trim.isEmpty)
79+
private val applyAllowed = Observable.combineLatest(name, host)
80+
.map(x => !x._1.trim.isEmpty && !x._2.trim.isEmpty)
8581

8682
override def content(): UiWidget = UiPanel(layoutData, Grid("cols 2,margin 5"), items = Seq(
8783
UiLabel(text = "Connection Name"),
8884
UiText("growx", text = name),
8985
UiLabel(text = "Kafka Host"),
9086
UiText("growx", text = host),
91-
UiLabel(text = "Consumer Group"),
92-
UiText("growx", text = group),
9387
UiSeparator("colspan 2, growx, graby, valign T", orientation = UiHoriz),
9488
UiPanel("colspan 2, halign R", Grid("cols 2,margin 2"), items = Seq(
9589
UiButton(text = "Cancel", onAction = onClose, cancelButton = true),

0 commit comments

Comments
 (0)