Skip to content

Commit 86f97bc

Browse files
committed
Allow setting arbitrary Kafka producer configs and callback
1 parent 951d666 commit 86f97bc

File tree

4 files changed

+87
-34
lines changed

4 files changed

+87
-34
lines changed

load-generator/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ java {
1616
withSourcesJar()
1717
}
1818

19-
group 'com.dynatrace'
20-
version '1.0-SNAPSHOT'
19+
group = 'com.dynatrace'
20+
version = '1.0-SNAPSHOT'
2121

2222
repositories {
2323
mavenCentral()

load-generator/src/main/java/com/dynatrace/research/shufflebench/KafkaLoadGenerator.java

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import com.dynatrace.hash4j.hashing.Hashing;
44
import com.dynatrace.research.shufflebench.record.RandomRecordGenerator;
5+
import com.dynatrace.research.shufflebench.record.Record;
6+
import io.smallrye.config.SmallRyeConfig;
57
import org.eclipse.microprofile.config.Config;
68
import org.eclipse.microprofile.config.ConfigProvider;
79
import org.slf4j.Logger;
@@ -10,14 +12,14 @@
1012
import java.io.IOException;
1113
import java.util.ArrayList;
1214
import java.util.List;
15+
import java.util.Map;
1316
import java.util.concurrent.*;
17+
import java.util.function.Consumer;
1418

1519
public class KafkaLoadGenerator {
1620

1721
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLoadGenerator.class);
1822

19-
private final int executionTimeMs;
20-
2123
private final String seedString;
2224

2325
private final String kafkaBootstrapServers;
@@ -29,49 +31,82 @@ public class KafkaLoadGenerator {
2931

3032
private final int recordSizeInBytes;
3133

34+
private final Map<String, String> kafkaProducerConfig;
35+
3236
private final ScheduledExecutorService executor;
3337

38+
private final Consumer<Record> callback; // May be null
39+
3440
private final List<KafkaSender> openKafkaSenders = new ArrayList<>();
3541
private final List<RecordSource> openRecordSources = new ArrayList<>();
3642

43+
3744
public KafkaLoadGenerator(
38-
int executionTimeMs,
3945
String seedString,
4046
String kafkaBootstrapServers,
4147
String kafkaTopic,
4248
int numSources,
4349
int recordsPerSecondAndSource,
4450
int recordSizeInBytes,
45-
int threadPoolSize
51+
int threadPoolSize,
52+
Map<String, String> kafkaProducerConfig
53+
) {
54+
this(
55+
seedString,
56+
kafkaBootstrapServers,
57+
kafkaTopic,
58+
numSources,
59+
recordsPerSecondAndSource,
60+
recordSizeInBytes,
61+
threadPoolSize,
62+
kafkaProducerConfig,
63+
null);
64+
}
65+
66+
public KafkaLoadGenerator(
67+
String seedString,
68+
String kafkaBootstrapServers,
69+
String kafkaTopic,
70+
int numSources,
71+
int recordsPerSecondAndSource,
72+
int recordSizeInBytes,
73+
int threadPoolSize,
74+
Map<String, String> kafkaProducerConfig,
75+
Consumer<Record> callback
4676
) {
47-
this.executionTimeMs = executionTimeMs;
4877
this.seedString = seedString;
4978
this.kafkaBootstrapServers = kafkaBootstrapServers;
5079
this.kafkaTopic = kafkaTopic;
5180
this.numSources = numSources;
5281
this.recordsPerSecondAndSource = recordsPerSecondAndSource;
5382
this.recordSizeInBytes = recordSizeInBytes;
5483
this.executor = new ScheduledThreadPoolExecutor(threadPoolSize);
84+
this.kafkaProducerConfig = kafkaProducerConfig;
85+
this.callback = callback;
5586
}
5687

57-
public void startBlocking() throws InterruptedException {
58-
final KafkaSender kafkaSender = new KafkaSender(this.kafkaBootstrapServers, this.kafkaTopic);
88+
public void startAsync() {
89+
final KafkaSender kafkaSender = new KafkaSender(this.kafkaBootstrapServers, this.kafkaTopic, this.kafkaProducerConfig, this.callback);
5990
for (int sourceId = 0; sourceId < numSources; sourceId++) {
6091
final long seed = Hashing.komihash4_3().hashStream().putString(seedString).putInt(sourceId).getAsLong();
6192
final RecordSource recordSource = new RecordSource(
62-
executor,
63-
this.recordsPerSecondAndSource,
64-
kafkaSender,
65-
// new StaticRecordGenerator(),
66-
new RandomRecordGenerator(seed, recordSizeInBytes),
67-
"source" + sourceId);
93+
executor,
94+
this.recordsPerSecondAndSource,
95+
kafkaSender,
96+
// new StaticRecordGenerator(),
97+
new RandomRecordGenerator(seed, recordSizeInBytes),
98+
"source" + sourceId);
6899
openRecordSources.add(recordSource);
69100
}
70101
openKafkaSenders.add(kafkaSender);
71-
Thread.sleep(this.executionTimeMs);
72102
}
73103

74-
public void stop() throws InterruptedException, IOException {
104+
public void startBlocking(int executionTimeMs) throws InterruptedException {
105+
this.startAsync();
106+
Thread.sleep(executionTimeMs);
107+
}
108+
109+
public void stop() {
75110
for (RecordSource recordSource : openRecordSources) {
76111
recordSource.stop();
77112
}
@@ -81,30 +116,31 @@ public void stop() throws InterruptedException, IOException {
81116
}
82117
openKafkaSenders.clear();
83118
executor.shutdown();
84-
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
119+
try {
120+
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
121+
} catch (InterruptedException e) {
122+
throw new RuntimeException(e);
123+
}
85124
}
86125

87-
public static void main(String[] args) throws InterruptedException, IOException {
88-
final Config config = ConfigProvider.getConfig();
126+
public static void main(String[] args) throws InterruptedException {
127+
final SmallRyeConfig config = ConfigProvider.getConfig().unwrap(SmallRyeConfig.class);
89128
final KafkaLoadGenerator kafkaLoadGenerator = new KafkaLoadGenerator(
90-
config.getValue("execution.time.ms", Integer.class),
91129
config.getValue("seed.string", String.class),
92130
config.getValue("kafka.bootstrap.servers", String.class),
93131
config.getValue("kafka.topic", String.class),
94132
config.getValue("num.sources", Integer.class),
95133
config.getValue("num.records.per.source.second", Integer.class),
96134
config.getValue("record.size.bytes", Integer.class),
97-
config.getValue("thread.pool.size", Integer.class)
135+
config.getValue("thread.pool.size", Integer.class),
136+
config.getOptionalValues("kafka.producer", String.class, String.class).orElse(Map.of())
98137
);
138+
int executionTimeMs = config.getValue("execution.time.ms", Integer.class);
99139
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
100140
LOGGER.info("Shut down load generator.");
101-
try {
102-
kafkaLoadGenerator.stop();
103-
} catch (InterruptedException | IOException e) {
104-
throw new RuntimeException(e);
105-
}
141+
kafkaLoadGenerator.stop();
106142
}));
107-
kafkaLoadGenerator.startBlocking();
143+
kafkaLoadGenerator.startBlocking(executionTimeMs);
108144
kafkaLoadGenerator.stop();
109145
}
110146
}

load-generator/src/main/java/com/dynatrace/research/shufflebench/KafkaSender.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@
22

33
import com.dynatrace.research.shufflebench.record.Record;
44
import com.dynatrace.research.shufflebench.record.RecordSerializer;
5-
import org.apache.kafka.clients.producer.KafkaProducer;
6-
import org.apache.kafka.clients.producer.ProducerConfig;
7-
import org.apache.kafka.clients.producer.ProducerRecord;
8-
import org.apache.kafka.clients.producer.RecordMetadata;
5+
import org.apache.kafka.clients.producer.*;
96
import org.apache.logging.log4j.LogManager;
107
import org.apache.logging.log4j.Logger;
118

129
import java.io.Closeable;
10+
import java.util.Map;
1311
import java.util.Properties;
1412
import java.util.concurrent.Future;
1513
import java.util.function.Consumer;
@@ -22,18 +20,34 @@ public class KafkaSender implements Consumer<Record>, Closeable {
2220

2321
private final String topic;
2422

25-
public KafkaSender(final String bootstrapServers, final String topic) {
23+
private final Consumer<Record> callback; // May be null
24+
25+
public KafkaSender(final String bootstrapServers, Map<String, String> kafkaProducerConfig, final String topic) {
26+
this(bootstrapServers, topic, kafkaProducerConfig, null);
27+
}
28+
29+
public KafkaSender(final String bootstrapServers, final String topic, Map<String, String> kafkaProducerConfig, final Consumer<Record> callback) {
2630
Properties properties = new Properties();
2731
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
32+
properties.putAll(kafkaProducerConfig);
2833
this.kafkaProducer = new KafkaProducer<>(properties, new NullKeySerializer(), new RecordSerializer());
2934
this.topic = topic;
35+
this.callback = callback;
3036
}
3137

3238
@Override
3339
public void accept(Record record) {
3440
ProducerRecord<Void, Record> producerRecord = new ProducerRecord<>(this.topic, record);
3541
LOGGER.debug("Send record to Kafka: {}", producerRecord);
36-
Future<RecordMetadata> result = this.kafkaProducer.send(producerRecord);
42+
Future<RecordMetadata> result = this.produceKafkaRecord(producerRecord);
43+
}
44+
45+
private Future<RecordMetadata> produceKafkaRecord(ProducerRecord<Void, Record> producerRecord) {
46+
if (callback == null) {
47+
return this.kafkaProducer.send(producerRecord);
48+
} else {
49+
return this.kafkaProducer.send(producerRecord, (metadata, exception) -> callback.accept(producerRecord.value()));
50+
}
3751
}
3852

3953
@Override

load-generator/src/main/resources/META-INF/microprofile-config.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ execution.time.ms=604800000
22
seed.string=shuffle-load-generator-0
33
kafka.bootstrap.servers=localhost:9092
44
kafka.topic=input
5+
# Set any Kafka producer property with: kafka.producer."<...>"=<...>
6+
#kafka.producer."linger.ms"=100
7+
# To set via environment variable: KAFKA_PRODUCER__LINGER_MS__=100
58
num.sources=1
69
num.records.per.source.second=1000
710
record.size.bytes=1024

0 commit comments

Comments
 (0)