Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
219 commits
Select commit Hold shift + click to select a range
cd3c406
Add dev script for sending mock log lines into the pipeline
lamr02n Dec 23, 2024
6e676b5
Add grafana container including provisioning
lamr02n Dec 23, 2024
98b4eed
Update log_generator.py list of record types
lamr02n Dec 23, 2024
3dd9864
Add first draft of dashboard
lamr02n Dec 23, 2024
d4f98b8
Update dashboard
lamr02n Dec 31, 2024
915a35d
Update dashboard
lamr02n Jan 1, 2025
0f15302
Update dashboard
lamr02n Jan 1, 2025
021bef5
Update dashboard
lamr02n Jan 1, 2025
56f0a6e
Update dashboard
lamr02n Jan 1, 2025
b658aab
Update dashboard
lamr02n Jan 1, 2025
940781e
Update dashboard
lamr02n Jan 1, 2025
52df552
Update dashboard
lamr02n Jan 1, 2025
2f94a0c
Update dashboard
lamr02n Jan 1, 2025
1c3f6ad
Add fill_levels table
lamr02n Jan 1, 2025
be5afb4
Update dashboard
lamr02n Jan 1, 2025
6514ed2
Update dashboard
lamr02n Jan 1, 2025
efef2c4
Update dashboard
lamr02n Jan 1, 2025
765b4e6
Change fill_level inserts on batch_handler.py and clean up some methods
lamr02n Jan 3, 2025
116b108
Swap order of fill_level insert and message adding in collector.py
lamr02n Jan 3, 2025
bc68e47
Update dashboard
lamr02n Jan 3, 2025
29a1f68
Swap order of fill_level insert and producing in inspector.py
lamr02n Jan 3, 2025
53a9fc1
Update test_buffered_batch.py
lamr02n Jan 3, 2025
c483cc6
Clean up BufferedBatch
lamr02n Jan 3, 2025
6b41238
Clean up BufferedBatch
lamr02n Jan 3, 2025
8a00494
Clean up BufferedBatchSender
lamr02n Jan 3, 2025
f3b53cd
Clean up Collector
lamr02n Jan 3, 2025
9b6f9bc
Clean up logline_handler.py
lamr02n Jan 3, 2025
2c19d3e
Update dashboard
lamr02n Jan 3, 2025
04b0286
Update dashboard
lamr02n Jan 3, 2025
ef32e1f
Update dashboard
lamr02n Jan 9, 2025
18555c4
Update config values
lamr02n Jan 9, 2025
7852af1
Rename Kafka topics
lamr02n Jan 10, 2025
7e33fcb
Update KafkaHandlers to allow for replicated modules
lamr02n Jan 10, 2025
867f5cb
Update mock_logs.dev.py
lamr02n Jan 10, 2025
77195a4
Add mock insertion for real data sets
lamr02n Jan 10, 2025
665f9e8
Remove fixed IP addresses for modules
lamr02n Jan 12, 2025
96b2b8d
Update real_logs.dev.py
lamr02n Jan 12, 2025
c47298f
Update tests for Inspector
lamr02n Jan 12, 2025
8c0c289
Add default value for HOSTNAME
lamr02n Jan 12, 2025
0efd772
Update test_exactly_once_kafka_produce_handler.py
lamr02n Jan 12, 2025
f4bbaea
Split up dashboard (1)
lamr02n Jan 12, 2025
48352e0
Update dashboard
lamr02n Jan 12, 2025
9df6d3e
Split up dashboard (2)
lamr02n Jan 12, 2025
45c614a
Update dashboard
lamr02n Jan 12, 2025
e31945f
Update dashboard
lamr02n Jan 12, 2025
80839f2
Update dashboard
lamr02n Jan 12, 2025
f9d51b5
Update dashboard
lamr02n Jan 13, 2025
c0ac734
Update fill_states.json
lamr02n Jan 13, 2025
0553216
Rename fill_states.json to log_volumes.json
lamr02n Jan 13, 2025
40a491f
Update log_volumes.json
lamr02n Jan 13, 2025
1cad44f
Update log_volumes.json
lamr02n Jan 13, 2025
fa5163a
Update log_volumes.json
lamr02n Jan 13, 2025
095971c
Update log_volumes.json
lamr02n Jan 13, 2025
e69080c
Update log_volumes.json
lamr02n Jan 14, 2025
6c5d84d
Update log_volumes.json
lamr02n Jan 14, 2025
010714e
Update latencies.json
lamr02n Jan 14, 2025
883a3bb
Update latencies.json
lamr02n Jan 14, 2025
680d067
Update latencies.json
lamr02n Jan 14, 2025
7296916
Update dashboards
lamr02n Jan 14, 2025
3fe55c6
Add alerts dashboard
lamr02n Jan 14, 2025
7f203e1
Update topic creation in kafka_handler.py
lamr02n Jan 14, 2025
752c03b
Update alerts.json
lamr02n Jan 14, 2025
a912cd4
Update alerts.json
lamr02n Jan 14, 2025
5bab8a9
Update alerts.json
lamr02n Jan 14, 2025
a6c19b4
Update alerts.json
lamr02n Jan 14, 2025
067d9af
Update log_volumes.json
lamr02n Jan 14, 2025
4d35a2b
Update alerts.json
lamr02n Jan 14, 2025
bb9d5bf
Update log_volumes.json
lamr02n Jan 14, 2025
70a98cb
Update log_volumes.json
lamr02n Jan 14, 2025
36678cc
Update alerts.json
lamr02n Jan 15, 2025
cf93d20
Update log_volumes.json
lamr02n Jan 15, 2025
edd01c8
Update alerts.json
lamr02n Jan 15, 2025
68c92d0
Move create-tables directory to docker directory
lamr02n Jan 17, 2025
42f1b4b
Add dashboard for data tests
lamr02n Jan 17, 2025
05c5b30
Move grafana to its own docker-compose
lamr02n Jan 17, 2025
10fc6ae
Add grafana docker-compose for data tests
lamr02n Jan 17, 2025
7e0a26c
Create tables through entry point instead of monitoring_agent.py
lamr02n Jan 17, 2025
b8c8cf1
Add docker-compose.datatests.yml
lamr02n Jan 17, 2025
8cc3ca9
Move pipeline to its own docker-compose
lamr02n Jan 17, 2025
ebbf60e
Add script for loading real logs into the pipeline
lamr02n Jan 17, 2025
49e80f0
Add script for converting data into json.gz files
lamr02n Jan 17, 2025
470ba7c
Add script for creating the datatest tables
lamr02n Jan 17, 2025
7893652
First functioning docker-compose setup
lamr02n Jan 17, 2025
fa91d61
Restructure docker-compose files
lamr02n Jan 17, 2025
37b2d38
Restructure .env file
lamr02n Jan 17, 2025
ffaf51d
Update datatests.json dashboard
lamr02n Jan 18, 2025
4631d25
Update script for creating .json.gz from dataset
lamr02n Jan 18, 2025
6c70c16
Update create table statements
lamr02n Jan 18, 2025
db81f05
Add LIMIT to query.dev.py
lamr02n Jan 18, 2025
9e0c858
Update test_exactly_once_kafka_consume_handler.py
lamr02n Jan 18, 2025
c827439
Update datatests.json
lamr02n Jan 21, 2025
a5528ee
Update datatests.json
lamr02n Jan 22, 2025
66ecc35
Update datatests.json
lamr02n Jan 22, 2025
9a7b0a2
Update datatests.json
lamr02n Jan 22, 2025
1cd9c20
Update alerts.json
lamr02n Jan 23, 2025
1bb3068
Update alerts.json
lamr02n Jan 23, 2025
835b9b4
Update datatests.json
lamr02n Jan 25, 2025
655cb40
Update alerts.json
lamr02n Jan 25, 2025
4fed38f
Update alerts.json
lamr02n Jan 25, 2025
c15abfe
Update alerts.json
lamr02n Jan 25, 2025
bc49f7b
Update alerts.json
lamr02n Jan 25, 2025
4cf55bd
Update alerts.json
lamr02n Jan 25, 2025
520968f
Update alerts.json
lamr02n Jan 25, 2025
a4e65cd
Update latencies.json
lamr02n Jan 25, 2025
d17cccf
Update latencies.json
lamr02n Jan 25, 2025
d2cb856
Update latencies.json
lamr02n Jan 25, 2025
1614093
Update latencies.json
lamr02n Jan 25, 2025
7b777e4
Update latencies.json
lamr02n Jan 25, 2025
1b68f33
Update latencies.json
lamr02n Jan 25, 2025
1ff728a
Update latencies.json
lamr02n Jan 25, 2025
8cb211e
Update log_volumes.json
lamr02n Jan 25, 2025
680d31d
Update log_volumes.json
lamr02n Jan 25, 2025
0eac513
Update log_volumes.json
lamr02n Jan 25, 2025
200f7e2
Update database insertion in detector.py
lamr02n Jan 25, 2025
de95e2a
Update log_volumes.json
lamr02n Jan 25, 2025
07d8c77
Update log_volumes.json
lamr02n Jan 25, 2025
f5aa6cb
Add geoip database
lamr02n Jan 25, 2025
bfef91b
Update alerts.json
lamr02n Jan 29, 2025
6799fd8
Update alerts.json
lamr02n Jan 29, 2025
3c4a0bc
Update general.json
lamr02n Jan 29, 2025
34366e7
Update datatests.json
lamr02n Jan 29, 2025
bfac2ed
Rename general.json to overview.json
lamr02n Jan 29, 2025
59cd677
Update log_volumes.json
lamr02n Jan 30, 2025
99608ef
Update log_volumes.json
lamr02n Jan 30, 2025
78a0546
Update log_volumes.json
lamr02n Jan 30, 2025
9937251
Update log_volumes.json
lamr02n Jan 30, 2025
9d767d3
Update log_volumes.json
lamr02n Jan 30, 2025
f98b5aa
Update datatests.json
lamr02n Jan 30, 2025
3a2b26a
Update datatests.json
lamr02n Jan 30, 2025
aae2b4c
Update datatests.json
lamr02n Jan 30, 2025
b47edf9
Update datatests.json
lamr02n Jan 30, 2025
b568083
Update alerts.json
lamr02n Jan 30, 2025
e493a41
Update datatests.json
lamr02n Jan 30, 2025
bf51d99
Update latencies.json
lamr02n Jan 30, 2025
0612445
Update log_volumes.json
lamr02n Jan 30, 2025
837e244
Delete overview.json
lamr02n Jan 30, 2025
1e7cb29
Update latencies.json
lamr02n Jan 30, 2025
9bf29b8
Add overview.json
lamr02n Jan 30, 2025
461e5df
Update alerts.json
lamr02n Jan 30, 2025
c2fbe57
Update overview.json
lamr02n Jan 30, 2025
454a3e4
Update overview.json
lamr02n Jan 30, 2025
1554613
Update alerts.json
lamr02n Jan 30, 2025
f7cac77
Update overview.json
lamr02n Jan 30, 2025
10c5a36
Update alerts.json
lamr02n Jan 30, 2025
713df9d
Update latencies.json
lamr02n Jan 30, 2025
3b681e0
Merge remote-tracking branch 'refs/remotes/origin/main' into add-grafana
lamr02n Feb 5, 2025
7bf07dd
Fix docker compose
lamr02n Feb 5, 2025
41f7080
Update mock logs scripts
lamr02n Feb 8, 2025
117a886
Update log_volumes.json
lamr02n Feb 8, 2025
8f1a473
Add code for running the scalability tests
lamr02n Feb 8, 2025
a910186
Ramp-up tests can now be configured more freely
lamr02n Feb 9, 2025
1c3d3dd
Update overview.json
lamr02n Feb 9, 2025
344cf0b
Update overview.json
lamr02n Feb 9, 2025
28a8d56
Finalize overview.json
lamr02n Feb 9, 2025
1fc67a4
Finalize latencies.json
lamr02n Feb 9, 2025
9e51822
Update log_volumes.json
lamr02n Feb 9, 2025
7dea435
Update log_volumes.json
lamr02n Feb 9, 2025
2fc9360
Insert logline_timestamps in Inspector for each logline individually
lamr02n Feb 9, 2025
c698073
Remove comment in prefilter.py
lamr02n Feb 9, 2025
0b6d7be
Add fill_level update in detector.py
lamr02n Feb 9, 2025
a7b429d
Insert logline_timestamps in Inspector and Detector for each logline …
lamr02n Feb 9, 2025
f200029
Update log_volumes.json
lamr02n Feb 9, 2025
3f0d6ae
Update log_volumes.json
lamr02n Feb 9, 2025
d3a16f9
Update log_volumes.json
lamr02n Feb 9, 2025
ad9a172
Update Dockerfile.dev-query
lamr02n Feb 9, 2025
7ae3efe
Remove geoip table
lamr02n Feb 9, 2025
e77dfe1
Update test classes
lamr02n Feb 9, 2025
67fc9b5
Restructure run_test.py
lamr02n Feb 10, 2025
34b6533
Implement frequencies for BurstTest
lamr02n Feb 12, 2025
3f56a87
Add docker-compose.swarm.yml
lamr02n Feb 15, 2025
b080baf
Update docker-compose.swarm.yml
lamr02n Feb 15, 2025
e85d3f6
Update docker-compose.swarm.yml
lamr02n Feb 15, 2025
d68dac2
Update docker-compose.swarm.yml
lamr02n Feb 15, 2025
e8cfe89
Update builds to images in docker-compose.swarm.yml
lamr02n Feb 15, 2025
fc56479
Update docker-compose.swarm.yml
lamr02n Feb 15, 2025
c031b8a
Update node.hostname in docker-compose.swarm.yml
lamr02n Feb 15, 2025
1204c3b
Update IP addresses in docker-compose.swarm.yml
lamr02n Feb 15, 2025
4079366
Update docker-compose.swarm.yml
lamr02n Feb 15, 2025
5c30252
Update network in docker-compose.swarm.yml
lamr02n Feb 15, 2025
c431462
Update network in docker-compose.swarm.yml
lamr02n Feb 15, 2025
14638dc
Update network in docker-compose.swarm.yml
lamr02n Feb 15, 2025
bc68a62
Prepare docker-compose.swarm.yml for testing
lamr02n Feb 15, 2025
bfdf007
Update docker-compose.swarm.yml
lamr02n Feb 15, 2025
cea6992
Update docker-compose.swarm.yml
lamr02n Feb 15, 2025
4fe8e80
Update docker-compose.swarm.yml
lamr02n Feb 15, 2025
2af2a3c
Update docker-compose.swarm.yml
lamr02n Feb 15, 2025
8f143ef
Update docker-compose.swarm.yml
lamr02n Feb 17, 2025
b8d7aa4
Update docker-compose.swarm.yml by activating Grafana
lamr02n Feb 17, 2025
b2b5e3b
Update docker-compose.swarm.yml for testing
lamr02n Feb 17, 2025
0a5c042
Update docker-compose.swarm.yml for testing
lamr02n Feb 17, 2025
e1f9a16
Update docker-compose.swarm.yml for testing
lamr02n Feb 17, 2025
fc32b18
Update docker-compose.swarm.yml for testing by activating clickhouse
lamr02n Feb 17, 2025
a2c5df5
Update docker-compose.swarm.yml for testing by activating logserver
lamr02n Feb 17, 2025
44b2ebe
Update docker-compose.swarm.yml for testing by activating logcollector
lamr02n Feb 17, 2025
a596d58
Update docker-compose.swarm.yml for testing by activating pipeline co…
lamr02n Feb 17, 2025
57375ad
Update docker-compose.swarm.yml for testing
lamr02n Feb 17, 2025
7a81b3a
Update docker-compose.swarm.yml for testing
lamr02n Feb 17, 2025
15069ed
Update docker-compose.swarm.yml for testing
lamr02n Feb 17, 2025
20eb258
Update docker-compose.swarm.yml for testing
lamr02n Feb 17, 2025
62ce607
Add monitoring container to be published to docker
lamr02n Feb 18, 2025
ad40844
Make use of kafka hostnames instead of IPs
lamr02n Feb 18, 2025
25f131f
Fix healthcheck in docker-compose.swarm.yml
lamr02n Feb 18, 2025
1a739ae
Use clickhouse-server hostname instead of IP address
lamr02n Feb 18, 2025
66ca276
Update datatest docker-compose files
lamr02n Feb 18, 2025
4268fe1
Update test_detector.py
lamr02n Feb 18, 2025
2fb5a6b
Update test_buffered_batch.py
lamr02n Feb 18, 2025
52fd1fb
Update test_batch_handler.py
lamr02n Feb 18, 2025
7c2050d
Update test_prefilter.py
lamr02n Feb 18, 2025
f0bce4e
Update test_exactly_once_kafka_consume_handler.py
lamr02n Feb 19, 2025
4a7b465
Update test_exactly_once_kafka_produce_handler.py
lamr02n Feb 19, 2025
ace21cf
Update test_kafka_consume_handler.py
lamr02n Feb 19, 2025
e8f0cbe
Update test_simple_kafka_consume_handler.py
lamr02n Feb 19, 2025
bebc15b
Update test_simple_kafka_produce_handler.py
lamr02n Feb 19, 2025
7f3a48e
Update test_server.py
lamr02n Feb 19, 2025
c82c501
Make all tests green
lamr02n Feb 19, 2025
49c01dd
Update test_kafka_consume_handler.py
lamr02n Feb 19, 2025
9113bb3
Update test_kafka_consume_handler.py
lamr02n Feb 19, 2025
4fd50f6
Update test_clickhouse_connector.py
lamr02n Feb 19, 2025
8d643ae
Update test_detector.py
lamr02n Feb 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/build_publish_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ jobs:
"inspector",
"logcollector",
"logserver",
"prefilter"
"prefilter",
"monitoring"
]
permissions:
contents: read
Expand Down
24 changes: 12 additions & 12 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pipeline:
- [ "response_ip", IpAddress ]
- [ "size", RegEx, '^\d+b$' ]
batch_handler:
batch_size: 1000
batch_timeout: 20.0
batch_size: 10000
batch_timeout: 30.0
subnet_id:
ipv4_prefix_length: 24
ipv6_prefix_length: 64
Expand Down Expand Up @@ -65,25 +65,25 @@ pipeline:

monitoring:
clickhouse_connector:
batch_size: 10000
batch_size: 50 # do not set higher
batch_timeout: 2.0

environment:
timestamp_format: "%Y-%m-%dT%H:%M:%S.%fZ"
kafka_brokers:
- hostname: 172.27.0.3
- hostname: kafka1
port: 8097
- hostname: 172.27.0.4
- hostname: kafka2
port: 8098
- hostname: 172.27.0.5
- hostname: kafka3
port: 8099
kafka_topics:
pipeline:
logserver_in: "pipeline.logserver_in"
logserver_to_collector: "pipeline.logserver_to_collector"
batch_sender_to_prefilter: "pipeline.batch_sender_to_prefilter"
prefilter_to_inspector: "pipeline.prefilter_to_inspector"
inspector_to_detector: "pipeline.inspector_to_detector"
logserver_in: "pipeline-logserver_in"
logserver_to_collector: "pipeline-logserver_to_collector"
batch_sender_to_prefilter: "pipeline-batch_sender_to_prefilter"
prefilter_to_inspector: "pipeline-prefilter_to_inspector"
inspector_to_detector: "pipeline-inspector_to_detector"
monitoring:
clickhouse_server:
hostname: 172.27.0.11
hostname: clickhouse-server
2 changes: 1 addition & 1 deletion docker/.env
Original file line number Diff line number Diff line change
@@ -1 +1 @@
MOUNT_PATH=./default.txt
MOUNT_PATH=../../default.txt
17 changes: 17 additions & 0 deletions docker/benchmark_tests/Dockerfile.run_test
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM python:3.11-slim-bookworm

ENV PYTHONDONTWRITEBYTECODE=1

WORKDIR /usr/src/app

RUN pip --disable-pip-version-check install --no-cache-dir --no-compile marshmallow_dataclass colorlog pyYAML confluent_kafka numpy polars scikit-learn torch

COPY src/base ./src/base
COPY src/train ./src/train
COPY config.yaml .
COPY docker/benchmark_tests .
COPY data ./data

RUN rm -rf /root/.cache

CMD [ "python", "run_test.py"]
20 changes: 20 additions & 0 deletions docker/benchmark_tests/docker-compose.run_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
services:
benchmark_test_run:
build:
context: ../..
dockerfile: docker/benchmark_tests/Dockerfile.run_test
network: host
networks:
docker_heidgaf:
deploy:
resources:
limits:
cpus: '2'
memory: 512m
reservations:
cpus: '1'
memory: 256m

networks:
docker_heidgaf:
external: true
248 changes: 248 additions & 0 deletions docker/benchmark_tests/run_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
import datetime
import ipaddress
import os
import random
import sys
import time

import polars as pl
from confluent_kafka import KafkaError

sys.path.append(os.getcwd())
from src.base.kafka_handler import SimpleKafkaProduceHandler
from src.train.dataset import Dataset, DatasetLoader
from src.base.log_config import get_logger
from src.base.utils import setup_config

logger = get_logger()
config = setup_config()

PRODUCE_TO_TOPIC = config["environment"]["kafka_topics"]["pipeline"]["logserver_in"]


class DatasetGenerator:
"""Generates log lines and datasets."""

def __init__(self, data_base_path: str = "./data"):
datasets = DatasetLoader(base_path=data_base_path, max_rows=10000)

dataset = Dataset(
data_path="",
data=pl.concat(
[
datasets.dgta_dataset.data,
# datasets.cic_dataset.data,
# datasets.bambenek_dataset.data,
# datasets.dga_dataset.data,
# datasets.dgarchive_dataset.data,
]
),
max_rows=1000,
)

self.domains = dataset.data

def generate_random_logline(
self, statuses: list[str] = None, record_types: list[str] = None
):
"""Generates a (mostly) random logline."""
if record_types is None:
record_types = 6 * ["AAAA"] + 10 * ["A"] + ["PR", "CNAME"]

if statuses is None:
statuses = ["NOERROR", "NXDOMAIN"]

# choose timestamp
timestamp = (
datetime.datetime.now() + datetime.timedelta(0, 0, random.randint(0, 900))
).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"

# choose status code
status = random.choice(statuses)

# choose client IP address
number_of_subnets = 50
client_ip = (
f"192.168.{random.randint(0, number_of_subnets)}.{random.randint(1, 255)}"
)

# choose server IP address
server_ip = f"10.10.0.{random.randint(1, 100)}"

# choose random domain (can be malicious or benign)
domain = self.get_random_domain()

# choose random record type
record_type = random.choice(record_types)

# choose random response IP address
def _get_random_ipv4():
max_ipv4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1
return ipaddress.IPv4Address._string_from_ip_int(
random.randint(0, max_ipv4)
)

def _get_random_ipv6():
max_ipv6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1
return ipaddress.IPv6Address._string_from_ip_int(
random.randint(0, max_ipv6)
)

ip_address_choices = [_get_random_ipv4(), _get_random_ipv6()]
response_ip_address = random.choice(ip_address_choices)

# choose random size
size = f"{random.randint(50, 255)}b"

return f"{timestamp} {status} {client_ip} {server_ip} {domain} {record_type} {response_ip_address} {size}"

def get_random_domain(self) -> str:
random_domain = self.domains.sample(n=1)
return random_domain["query"].item()

def generate_dataset(self, number_of_elements: int) -> list[str]:
dataset = []

for _ in range(number_of_elements):
logline = self.generate_random_logline()
dataset.append(logline)

return dataset


class ScalabilityTest:
"""Base class for tests that focus on the scalability of the software."""

def __init__(self):
self.dataset_generator = DatasetGenerator()
self.kafka_producer = SimpleKafkaProduceHandler()

self.interval_lengths = None
self.msg_per_sec_in_intervals = None

def execute(self):
"""Executes the test with the configured parameters."""
logger.warning(f"Start at: {datetime.datetime.now()}")

cur_index = 0
for i in range(len(self.msg_per_sec_in_intervals)):
cur_index = self._execute_one_interval(
cur_index=cur_index,
msg_per_sec=self.msg_per_sec_in_intervals[i],
length_in_sec=self.interval_lengths[i],
)

logger.warning(f"Stop at: {datetime.datetime.now()}")

def _execute_one_interval(
self, cur_index: int, msg_per_sec: float | int, length_in_sec: float | int
) -> int:
start_of_interval_timestamp = datetime.datetime.now()
logger.warning(
f"Start interval with {msg_per_sec} msg/s at {start_of_interval_timestamp}"
)

while (
datetime.datetime.now() - start_of_interval_timestamp
< datetime.timedelta(seconds=length_in_sec)
):
try:
self.kafka_producer.produce(
PRODUCE_TO_TOPIC,
self.dataset_generator.generate_random_logline(),
)
logger.info(
f"Sent message {cur_index + 1} at: {datetime.datetime.now()}"
)
cur_index += 1
except KafkaError:
logger.warning(KafkaError)
time.sleep(1.0 / msg_per_sec)

logger.warning(f"Finish interval with {msg_per_sec} msg/s")
return cur_index


class RampUpTest(ScalabilityTest):
"""Starts with a low rate and increases the rate in fixed intervals."""

def __init__(
self,
msg_per_sec_in_intervals: list[float | int],
interval_length_in_sec: int | float | list[int | float],
):
super().__init__()
self.msg_per_sec_in_intervals = msg_per_sec_in_intervals

if type(interval_length_in_sec) is list:
self.interval_lengths = interval_length_in_sec
else:
self.interval_lengths = [
interval_length_in_sec for _ in range(len(msg_per_sec_in_intervals))
]

if len(interval_length_in_sec) != len(msg_per_sec_in_intervals):
raise Exception("Different lengths of interval lists. Must be equal.")


class BurstTest(ScalabilityTest):
"""Starts with a normal rate, sends a high rate for a short period, then returns to normal rate. Repeats the
process for a defined number of times."""

def __init__(
self,
normal_rate_msg_per_sec: float | int,
burst_rate_msg_per_sec: float | int,
normal_rate_interval_length: float | int,
burst_rate_interval_length: float | int,
number_of_intervals: int = 1,
):
super().__init__()

self.msg_per_sec_in_intervals = [normal_rate_msg_per_sec]
self.interval_lengths = [normal_rate_interval_length]

for _ in range(number_of_intervals):
self.msg_per_sec_in_intervals.append(burst_rate_msg_per_sec)
self.msg_per_sec_in_intervals.append(normal_rate_msg_per_sec)

self.interval_lengths.append(burst_rate_interval_length)
self.interval_lengths.append(normal_rate_interval_length)


class LongTermTest(ScalabilityTest):
"""Starts with a low rate and increases the rate in fixed intervals."""

def __init__(self, full_length: float | int, msg_per_sec: float | int):
super().__init__()

self.msg_per_sec_in_intervals = [msg_per_sec]
self.interval_lengths = [full_length]


def main():
"""Creates the test instance and executes the test."""
# ramp_up_test = RampUpTest(
# msg_per_sec_in_intervals=[1, 10, 50, 100, 150],
# interval_length_in_sec=[10, 5, 4, 4, 2],
# )
# ramp_up_test.execute()

burst_test = BurstTest(
normal_rate_msg_per_sec=20,
burst_rate_msg_per_sec=10000,
normal_rate_interval_length=10,
burst_rate_interval_length=2,
number_of_intervals=3,
)
burst_test.execute()

# long_term_test = LongTermTest(
# full_length=10.4,
# msg_per_sec=15,
# )
# long_term_test.execute()


if __name__ == "__main__":
main()
11 changes: 11 additions & 0 deletions docker/create_datatest_tables/dgta_dataset.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS dgta_dataset (
query String,
class Int32,
labels Array(String),
tld String,
fqdn String,
secondleveldomain String,
thirdleveldomain String
)
ENGINE = MergeTree
PRIMARY KEY(query);
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ CREATE TABLE IF NOT EXISTS alerts (
alert_timestamp DateTime64(6) NOT NULL,
suspicious_batch_id UUID NOT NULL,
overall_score Float32 NOT NULL,
domain_names String NOT NULL,
result String,
)
ENGINE = MergeTree
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ CREATE TABLE IF NOT EXISTS dns_loglines (
status_code String NOT NULL,
client_ip String NOT NULL,
record_type String NOT NULL,
additional_fields Nullable(String)
additional_fields String
)
ENGINE = MergeTree
PRIMARY KEY (logline_id);
8 changes: 8 additions & 0 deletions docker/create_tables/fill_levels.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE IF NOT EXISTS fill_levels (
timestamp DateTime64(6) NOT NULL,
stage String NOT NULL,
entry_type String NOT NULL,
entry_count UInt32 DEFAULT 0
)
ENGINE = MergeTree
PRIMARY KEY (timestamp, stage, entry_type);
Loading
Loading