Skip to content

Commit 1f1e28f

Browse files
Merge pull request #60 from stefanDeveloper/update-benchmark-tests
Update benchmark tests
2 parents 7584301 + 0f3ddf9 commit 1f1e28f

File tree

7 files changed

+378
-301
lines changed

7 files changed

+378
-301
lines changed

docker/benchmark_tests/run_test.py

Lines changed: 85 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -210,39 +210,96 @@ def __init__(
210210
self.interval_lengths.append(normal_rate_interval_length)
211211

212212

213-
class LongTermTest(ScalabilityTest):
214-
"""Starts with a low rate and increases the rate in fixed intervals."""
213+
class LongTermTest:
214+
"""Keeps a consistent rate for a long time."""
215215

216-
def __init__(self, full_length: float | int, msg_per_sec: float | int):
217-
super().__init__()
216+
def __init__(self, full_length_in_min: float | int, msg_per_sec: float | int):
217+
self.dataset_generator = DatasetGenerator()
218+
self.kafka_producer = SimpleKafkaProduceHandler()
219+
220+
self.msg_per_sec = msg_per_sec
221+
self.full_length_in_min = full_length_in_min
222+
223+
def execute(self):
224+
"""Executes the test with the configured parameters."""
225+
start_timestamp = datetime.datetime.now()
226+
logger.warning(
227+
f"Start {self.full_length_in_min} minute-test with "
228+
f"rate {self.msg_per_sec} msg/sec at: {start_timestamp}"
229+
)
230+
231+
cur_index = 0
232+
while datetime.datetime.now() - start_timestamp < datetime.timedelta(
233+
minutes=self.full_length_in_min
234+
):
235+
try:
236+
self.kafka_producer.produce(
237+
PRODUCE_TO_TOPIC,
238+
self.dataset_generator.generate_random_logline(),
239+
)
240+
logger.info(
241+
f"Sent message {cur_index + 1} at: {datetime.datetime.now()}"
242+
)
243+
cur_index += 1
244+
except KafkaError:
245+
logger.warning(KafkaError)
246+
time.sleep(1.0 / self.msg_per_sec)
247+
248+
logger.warning(
249+
f"Stop at: {datetime.datetime.now()}, sent {cur_index} messages in the "
250+
f"past {(datetime.datetime.now() - start_timestamp).total_seconds() / 60} minutes."
251+
)
218252

219-
self.msg_per_sec_in_intervals = [msg_per_sec]
220-
self.interval_lengths = [full_length]
221253

254+
class MaximumThroughputTest(LongTermTest):
255+
"""Keeps a consistent rate that is too high to be handled."""
222256

223-
def main():
257+
def __init__(self, length_in_min: float | int, msg_per_sec: int = 10000):
258+
super().__init__(full_length_in_min=length_in_min, msg_per_sec=msg_per_sec)
259+
260+
261+
def main(test_type_nr):
224262
"""Creates the test instance and executes the test."""
225-
# ramp_up_test = RampUpTest(
226-
# msg_per_sec_in_intervals=[1, 10, 50, 100, 150],
227-
# interval_length_in_sec=[10, 5, 4, 4, 2],
228-
# )
229-
# ramp_up_test.execute()
230-
231-
burst_test = BurstTest(
232-
normal_rate_msg_per_sec=20,
233-
burst_rate_msg_per_sec=10000,
234-
normal_rate_interval_length=10,
235-
burst_rate_interval_length=2,
236-
number_of_intervals=3,
237-
)
238-
burst_test.execute()
239-
240-
# long_term_test = LongTermTest(
241-
# full_length=10.4,
242-
# msg_per_sec=15,
243-
# )
244-
# long_term_test.execute()
263+
match test_type_nr:
264+
case 1:
265+
ramp_up_test = RampUpTest(
266+
msg_per_sec_in_intervals=[1, 10, 50, 100, 150, 200],
267+
interval_length_in_sec=[30, 30, 30, 30, 30, 30],
268+
)
269+
ramp_up_test.execute()
270+
271+
case 2:
272+
burst_test = BurstTest(
273+
normal_rate_msg_per_sec=20,
274+
burst_rate_msg_per_sec=10000,
275+
normal_rate_interval_length=10,
276+
burst_rate_interval_length=2,
277+
number_of_intervals=3,
278+
)
279+
burst_test.execute()
280+
281+
case 3:
282+
maximum_throughput_test = MaximumThroughputTest(
283+
length_in_min=1,
284+
)
285+
maximum_throughput_test.execute()
286+
287+
case 4:
288+
long_term_test = LongTermTest(
289+
full_length_in_min=10,
290+
msg_per_sec=15,
291+
)
292+
long_term_test.execute()
293+
294+
case _:
295+
pass
245296

246297

247298
if __name__ == "__main__":
248-
main()
299+
"""
300+
1 - Ramp-up test
301+
2 - Burst test
302+
3 - Maximum throughput test
303+
4 - Long-term test
304+
"""
305+
main(1)
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
services:
2+
zookeeper:
3+
image: confluentinc/cp-zookeeper:7.3.2
4+
networks:
5+
- heidgaf
6+
environment:
7+
ZOOKEEPER_CLIENT_PORT: 2181
8+
ZOOKEEPER_SERVER_ID: 1
9+
healthcheck:
10+
test: [ "CMD-SHELL", "nc -z localhost 2181" ]
11+
interval: 10s
12+
timeout: 5s
13+
retries: 3
14+
deploy:
15+
placement:
16+
constraints: [ node.hostname == heidgaf-1 ]
17+
restart_policy:
18+
condition: on-failure
19+
20+
kafka1:
21+
image: confluentinc/cp-kafka:7.3.2
22+
networks:
23+
- heidgaf
24+
ports:
25+
- "8097:8097"
26+
- "29092:29092"
27+
depends_on:
28+
- zookeeper
29+
healthcheck:
30+
test: [ "CMD-SHELL", "nc -z localhost 8097" ]
31+
interval: 30s
32+
timeout: 10s
33+
retries: 10
34+
environment:
35+
KAFKA_BROKER_ID: 1
36+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
37+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
38+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
39+
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://tasks.kafka1:8097,DOCKER://host.docker.internal:29092
40+
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
41+
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
42+
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
43+
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
44+
deploy:
45+
placement:
46+
constraints: [ node.hostname == heidgaf-1 ]
47+
restart_policy:
48+
condition: on-failure
49+
50+
51+
kafka2:
52+
image: confluentinc/cp-kafka:7.3.2
53+
networks:
54+
- heidgaf
55+
ports:
56+
- "8098:8098"
57+
- "29093:29093"
58+
depends_on:
59+
- zookeeper
60+
healthcheck:
61+
test: [ "CMD-SHELL", "nc -z localhost 8098" ]
62+
interval: 30s
63+
timeout: 10s
64+
retries: 10
65+
environment:
66+
KAFKA_BROKER_ID: 2
67+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
68+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
69+
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://tasks.kafka2:8098,DOCKER://host.docker.internal:29093
70+
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
71+
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
72+
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
73+
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
74+
deploy:
75+
placement:
76+
constraints: [ node.hostname == heidgaf-1 ]
77+
restart_policy:
78+
condition: on-failure
79+
80+
kafka3:
81+
image: confluentinc/cp-kafka:7.3.2
82+
networks:
83+
- heidgaf
84+
ports:
85+
- "8099:8099"
86+
- "29094:29094"
87+
depends_on:
88+
- zookeeper
89+
healthcheck:
90+
test: [ "CMD-SHELL", "nc -z localhost 8099" ]
91+
interval: 30s
92+
timeout: 10s
93+
retries: 10
94+
environment:
95+
KAFKA_BROKER_ID: 3
96+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
97+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
98+
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://tasks.kafka3:8099,DOCKER://host.docker.internal:29094
99+
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
100+
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
101+
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
102+
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
103+
deploy:
104+
placement:
105+
constraints: [ node.hostname == heidgaf-1 ]
106+
restart_policy:
107+
condition: on-failure
108+
109+
networks:
110+
heidgaf:
111+
external: true
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
services:
2+
clickhouse-server:
3+
image: clickhouse/clickhouse-server:24.3.12.75-alpine
4+
volumes:
5+
- ../create_tables:/docker-entrypoint-initdb.d
6+
networks:
7+
- heidgaf
8+
ports:
9+
- "8123:8123"
10+
- "9000:9000"
11+
# healthcheck:
12+
# test: [ "CMD-SHELL", "nc -z localhost 8123" ]
13+
# interval: 10s
14+
# timeout: 5s
15+
# retries: 3
16+
deploy:
17+
placement:
18+
constraints: [ node.hostname == heidgaf-3 ]
19+
restart_policy:
20+
condition: on-failure
21+
22+
grafana:
23+
image: grafana/grafana:11.2.2-security-01
24+
networks:
25+
- heidgaf
26+
ports:
27+
- "3000:3000"
28+
volumes:
29+
- ../grafana-provisioning/dashboards:/etc/grafana/provisioning/dashboards
30+
- ../grafana-provisioning/dashboards/dashboards.yaml:/etc/grafana/provisioning/dashboards/dashboards.yaml
31+
- ../grafana-provisioning/datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
32+
environment:
33+
- GF_SECURITY_ADMIN_USER=admin
34+
- GF_SECURITY_ADMIN_PASSWORD=admin
35+
- GF_INSTALL_PLUGINS=grafana-clickhouse-datasource
36+
# healthcheck:
37+
# test: [ "CMD-SHELL", "nc -z localhost 3000" ]
38+
# interval: 10s
39+
# timeout: 5s
40+
# retries: 3
41+
deploy:
42+
placement:
43+
constraints: [ node.hostname == heidgaf-3 ]
44+
restart_policy:
45+
condition: on-failure
46+
47+
monitoring_agent:
48+
image: stefan96/heidgaf-monitoring
49+
networks:
50+
- heidgaf
51+
environment:
52+
- GROUP_ID=monitoring_agent
53+
depends_on:
54+
- kafka1
55+
- kafka2
56+
- kafka3
57+
- clickhouse-server
58+
deploy:
59+
placement:
60+
constraints: [ node.hostname == heidgaf-3 ]
61+
restart_policy:
62+
condition: on-failure
63+
64+
networks:
65+
heidgaf:
66+
external: true

0 commit comments

Comments
 (0)