From dc65833741f66e14ef42a7a30a7f114ceb420c72 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Mon, 24 Feb 2025 19:36:32 +0100 Subject: [PATCH 01/33] Update log_volumes.json --- .../dashboards/log_volumes.json | 95 ++++++++++--------- 1 file changed, 48 insertions(+), 47 deletions(-) diff --git a/docker/grafana-provisioning/dashboards/log_volumes.json b/docker/grafana-provisioning/dashboards/log_volumes.json index abbf228c..1db3b3d2 100644 --- a/docker/grafana-provisioning/dashboards/log_volumes.json +++ b/docker/grafana-provisioning/dashboards/log_volumes.json @@ -896,8 +896,8 @@ "barAlignment": 0, "barWidthFactor": 0.6, "drawStyle": "line", - "fillOpacity": 27, - "gradientMode": "opacity", + "fillOpacity": 11, + "gradientMode": "none", "hideFrom": { "legend": false, "tooltip": false, @@ -905,9 +905,6 @@ }, "insertNulls": false, "lineInterpolation": "stepAfter", - "lineStyle": { - "fill": "solid" - }, "lineWidth": 1, "pointSize": 5, "scaleDistribution": { @@ -923,6 +920,7 @@ "mode": "off" } }, + "fieldMinMax": false, "mappings": [], "thresholds": { "mode": "absolute", @@ -930,10 +928,6 @@ { "color": "green", "value": null - }, - { - "color": "red", - "value": 80 } ] } @@ -946,13 +940,13 @@ "x": 0, "y": 19 }, - "id": 51, + "id": 50, "options": { "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", - "showLegend": true + "showLegend": false }, "tooltip": { "mode": "single", @@ -980,34 +974,11 @@ }, "pluginVersion": "4.5.1", "queryType": "table", - "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_batches'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", - "refId": "Batch" - }, - { - "datasource": { - "type": "grafana-clickhouse-datasource", - "uid": "PDEE91DDB90597936" - }, - "editorType": "sql", - "format": 1, - "hide": false, - "meta": { - "builderOptions": { - "columns": [], - "database": "", - "limit": 1000, - "mode": "list", - "queryType": "table", - "table": "" - } - }, - "pluginVersion": "4.5.1", - "queryType": "table", - "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_buffer'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", - "refId": "Buffer" + "rawSql": "SELECT *\nFROM fill_levels\nWHERE stage = 'log_collection.collector'\n AND entry_type = 'total_loglines'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", + "refId": "A" } ], - "title": "BatchHandler Fill State", + "title": "Collector Fill State", "type": "timeseries" }, { @@ -1491,8 +1462,8 @@ "barAlignment": 0, "barWidthFactor": 0.6, "drawStyle": "line", - "fillOpacity": 11, - "gradientMode": "none", + "fillOpacity": 27, + "gradientMode": "opacity", "hideFrom": { "legend": false, "tooltip": false, @@ -1500,6 +1471,9 @@ }, "insertNulls": false, "lineInterpolation": "stepAfter", + "lineStyle": { + "fill": "solid" + }, "lineWidth": 1, "pointSize": 5, "scaleDistribution": { @@ -1515,7 +1489,6 @@ "mode": "off" } }, - "fieldMinMax": false, "mappings": [], "thresholds": { "mode": "absolute", @@ -1523,6 +1496,10 @@ { "color": "green", "value": null + }, + { + "color": "red", + "value": 80 } ] } @@ -1535,13 +1512,13 @@ "x": 0, "y": 25 }, - "id": 50, + "id": 51, "options": { "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", - "showLegend": false + "showLegend": true }, "tooltip": { "mode": "single", @@ -1569,11 +1546,34 @@ }, "pluginVersion": "4.5.1", "queryType": "table", - "rawSql": "SELECT *\nFROM fill_levels\nWHERE stage = 'log_collection.collector'\n AND entry_type = 'total_loglines'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", - "refId": "A" + "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_batches'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", + "refId": "Batch" + }, + { + "datasource": { + "type": "grafana-clickhouse-datasource", + "uid": "PDEE91DDB90597936" + }, + "editorType": "sql", + "format": 1, + "hide": false, + "meta": { + "builderOptions": { + "columns": [], + "database": "", + "limit": 1000, + "mode": "list", + "queryType": "table", + "table": "" + } + }, + "pluginVersion": "4.5.1", + "queryType": "table", + "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_buffer'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", + "refId": "Buffer" } ], - "title": "Collector Fill State", + "title": "BatchHandler Fill State", "type": "timeseries" }, { @@ -2350,7 +2350,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null } ] } @@ -2548,7 +2549,7 @@ "type": "nodeGraph" } ], - "refresh": "auto", + "refresh": "5s", "schemaVersion": 39, "tags": [], "templating": { From 30dddd1051f0fdab18861f03cfc271fefd726d4e Mon Sep 17 00:00:00 2001 From: Manuel F Date: Tue, 25 Feb 2025 10:14:39 +0100 Subject: [PATCH 02/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index 3f6ab4ab..14e87c23 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -263,8 +263,8 @@ def main(test_type_nr): match test_type_nr: case 1: ramp_up_test = RampUpTest( - msg_per_sec_in_intervals=[1, 10, 50, 100, 150, 200], - interval_length_in_sec=[30, 30, 30, 30, 30, 30], + msg_per_sec_in_intervals=[1, 10, 50, 100, 150, 200, 250], + interval_length_in_sec=[60, 60, 60, 60, 60, 60, 60], ) ramp_up_test.execute() From 8568b237a7ad95ae76623b0cba54345ad68eefce Mon Sep 17 00:00:00 2001 From: Manuel F Date: Tue, 25 Feb 2025 14:24:18 +0100 Subject: [PATCH 03/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index 14e87c23..e2189d53 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -263,8 +263,8 @@ def main(test_type_nr): match test_type_nr: case 1: ramp_up_test = RampUpTest( - msg_per_sec_in_intervals=[1, 10, 50, 100, 150, 200, 250], - interval_length_in_sec=[60, 60, 60, 60, 60, 60, 60], + msg_per_sec_in_intervals=[10, 50, 100, 150], + interval_length_in_sec=[120, 120, 120, 120], ) ramp_up_test.execute() From 39b11275f2ff8fa5348c93c7bcae7dddc58b90ca Mon Sep 17 00:00:00 2001 From: Manuel F Date: Tue, 25 Feb 2025 15:14:21 +0100 Subject: [PATCH 04/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index e2189d53..25d8e097 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -263,7 +263,7 @@ def main(test_type_nr): match test_type_nr: case 1: ramp_up_test = RampUpTest( - msg_per_sec_in_intervals=[10, 50, 100, 150], + msg_per_sec_in_intervals=[100, 150, 200, 250], interval_length_in_sec=[120, 120, 120, 120], ) ramp_up_test.execute() From 1d83c5542bf70b4d69980ca90e3218b5bab6e9ca Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 26 Feb 2025 08:02:02 +0100 Subject: [PATCH 05/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index 25d8e097..a0c504d5 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -270,9 +270,9 @@ def main(test_type_nr): case 2: burst_test = BurstTest( - normal_rate_msg_per_sec=20, - burst_rate_msg_per_sec=10000, - normal_rate_interval_length=10, + normal_rate_msg_per_sec=80, + burst_rate_msg_per_sec=1000, + normal_rate_interval_length=60, burst_rate_interval_length=2, number_of_intervals=3, ) @@ -302,4 +302,4 @@ def main(test_type_nr): 3 - Maximum throughput test 4 - Long-term test """ - main(1) + main(2) From 8b15391ec290ecaeda25ca4cbe85d5d7867b62cf Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 26 Feb 2025 08:28:10 +0100 Subject: [PATCH 06/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index a0c504d5..a0a68e4e 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -263,8 +263,8 @@ def main(test_type_nr): match test_type_nr: case 1: ramp_up_test = RampUpTest( - msg_per_sec_in_intervals=[100, 150, 200, 250], - interval_length_in_sec=[120, 120, 120, 120], + msg_per_sec_in_intervals=[1000, 0, 80], + interval_length_in_sec=[5, 120, 120], ) ramp_up_test.execute() @@ -302,4 +302,4 @@ def main(test_type_nr): 3 - Maximum throughput test 4 - Long-term test """ - main(2) + main(1) From 278f2cb434d3b730a85f4df88e7e31e55d299fd8 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 26 Feb 2025 08:32:05 +0100 Subject: [PATCH 07/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index a0a68e4e..03a6abd4 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -243,7 +243,11 @@ def execute(self): cur_index += 1 except KafkaError: logger.warning(KafkaError) - time.sleep(1.0 / self.msg_per_sec) + + if self.msg_per_sec > 0: + time.sleep(1.0 / self.msg_per_sec) + else: + time.sleep(1.0) logger.warning( f"Stop at: {datetime.datetime.now()}, sent {cur_index} messages in the " From 3f9e4190d2b837bcc90044fe5954b2a7cb0e7825 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 26 Feb 2025 08:35:53 +0100 Subject: [PATCH 08/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index 03a6abd4..08822211 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -268,7 +268,7 @@ def main(test_type_nr): case 1: ramp_up_test = RampUpTest( msg_per_sec_in_intervals=[1000, 0, 80], - interval_length_in_sec=[5, 120, 120], + interval_length_in_sec=[10, 20, 120], ) ramp_up_test.execute() From c712a20286582655cfa20e6eddf2733cd5a34aba Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 26 Feb 2025 08:40:17 +0100 Subject: [PATCH 09/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index 08822211..35977b40 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -157,7 +157,11 @@ def _execute_one_interval( cur_index += 1 except KafkaError: logger.warning(KafkaError) - time.sleep(1.0 / msg_per_sec) + + if msg_per_sec > 0: + time.sleep(1.0 / msg_per_sec) + else: + time.sleep(1.0) logger.warning(f"Finish interval with {msg_per_sec} msg/s") return cur_index From bc9cb283a84cc515bd66442a29ea454b172b9592 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 26 Feb 2025 08:40:48 +0100 Subject: [PATCH 10/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index 35977b40..8cd4961d 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -272,7 +272,7 @@ def main(test_type_nr): case 1: ramp_up_test = RampUpTest( msg_per_sec_in_intervals=[1000, 0, 80], - interval_length_in_sec=[10, 20, 120], + interval_length_in_sec=[10, 30, 60], ) ramp_up_test.execute() From 926cd24cfa8b296c4cb7f1fe18a81da927ab17ae Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 26 Feb 2025 08:44:19 +0100 Subject: [PATCH 11/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index 8cd4961d..64e95dc8 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -271,8 +271,8 @@ def main(test_type_nr): match test_type_nr: case 1: ramp_up_test = RampUpTest( - msg_per_sec_in_intervals=[1000, 0, 80], - interval_length_in_sec=[10, 30, 60], + msg_per_sec_in_intervals=[100, 150, 200, 250, 0, 100], + interval_length_in_sec=[120, 120, 120, 120, 20, 60], ) ramp_up_test.execute() From 3e9a4f4ec1d194d06d554b8f793ed2acbe1fe404 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 26 Feb 2025 08:49:27 +0100 Subject: [PATCH 12/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index 64e95dc8..04a3aceb 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -280,7 +280,7 @@ def main(test_type_nr): burst_test = BurstTest( normal_rate_msg_per_sec=80, burst_rate_msg_per_sec=1000, - normal_rate_interval_length=60, + normal_rate_interval_length=120, burst_rate_interval_length=2, number_of_intervals=3, ) @@ -310,4 +310,4 @@ def main(test_type_nr): 3 - Maximum throughput test 4 - Long-term test """ - main(1) + main(2) From 627f41a04f0d40e3d83df54884ee821cf126a44e Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 26 Feb 2025 09:00:44 +0100 Subject: [PATCH 13/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index 04a3aceb..2b82bb8e 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -271,7 +271,7 @@ def main(test_type_nr): match test_type_nr: case 1: ramp_up_test = RampUpTest( - msg_per_sec_in_intervals=[100, 150, 200, 250, 0, 100], + msg_per_sec_in_intervals=[100, 150, 200, 250, 1, 100], interval_length_in_sec=[120, 120, 120, 120, 20, 60], ) ramp_up_test.execute() From 2b9be86f73f94948f4e1f6468171a6ac11238dcc Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 26 Feb 2025 10:01:17 +0100 Subject: [PATCH 14/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index 2b82bb8e..80ec6f95 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -278,7 +278,7 @@ def main(test_type_nr): case 2: burst_test = BurstTest( - normal_rate_msg_per_sec=80, + normal_rate_msg_per_sec=50, burst_rate_msg_per_sec=1000, normal_rate_interval_length=120, burst_rate_interval_length=2, From ac10ff49bd5bc2d11a19aff669bb0eb72017c846 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 26 Feb 2025 12:21:33 +0100 Subject: [PATCH 15/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index 80ec6f95..8d9268de 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -288,7 +288,7 @@ def main(test_type_nr): case 3: maximum_throughput_test = MaximumThroughputTest( - length_in_min=1, + length_in_min=5, ) maximum_throughput_test.execute() @@ -310,4 +310,4 @@ def main(test_type_nr): 3 - Maximum throughput test 4 - Long-term test """ - main(2) + main(3) From b55f76b0df2dbf870455df32bcf6e0dd6fcac0ba Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 26 Feb 2025 15:36:50 +0100 Subject: [PATCH 16/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index 8d9268de..d073eedb 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -262,7 +262,7 @@ def execute(self): class MaximumThroughputTest(LongTermTest): """Keeps a consistent rate that is too high to be handled.""" - def __init__(self, length_in_min: float | int, msg_per_sec: int = 10000): + def __init__(self, length_in_min: float | int, msg_per_sec: int = 500): super().__init__(full_length_in_min=length_in_min, msg_per_sec=msg_per_sec) @@ -288,7 +288,7 @@ def main(test_type_nr): case 3: maximum_throughput_test = MaximumThroughputTest( - length_in_min=5, + length_in_min=2, ) maximum_throughput_test.execute() From 3b1d8b6b75c402c409ac9c2cf7b16069f42e5a91 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 26 Feb 2025 17:04:09 +0100 Subject: [PATCH 17/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index d073eedb..b2e13865 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -247,11 +247,7 @@ def execute(self): cur_index += 1 except KafkaError: logger.warning(KafkaError) - - if self.msg_per_sec > 0: - time.sleep(1.0 / self.msg_per_sec) - else: - time.sleep(1.0) + time.sleep(1.0 / self.msg_per_sec) logger.warning( f"Stop at: {datetime.datetime.now()}, sent {cur_index} messages in the " @@ -271,8 +267,8 @@ def main(test_type_nr): match test_type_nr: case 1: ramp_up_test = RampUpTest( - msg_per_sec_in_intervals=[100, 150, 200, 250, 1, 100], - interval_length_in_sec=[120, 120, 120, 120, 20, 60], + msg_per_sec_in_intervals=[100, 150, 200, 250], + interval_length_in_sec=[120, 120, 120, 120], ) ramp_up_test.execute() @@ -310,4 +306,4 @@ def main(test_type_nr): 3 - Maximum throughput test 4 - Long-term test """ - main(3) + main(1) From 17859131443a46addab4d7cb45649303bb8f37a9 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 26 Feb 2025 17:33:49 +0100 Subject: [PATCH 18/33] Update run_test.py --- docker/benchmark_tests/run_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index b2e13865..c9bcc0e3 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -267,7 +267,7 @@ def main(test_type_nr): match test_type_nr: case 1: ramp_up_test = RampUpTest( - msg_per_sec_in_intervals=[100, 150, 200, 250], + msg_per_sec_in_intervals=[10, 50, 100, 150], interval_length_in_sec=[120, 120, 120, 120], ) ramp_up_test.execute() From 094e6751dc37646cf0cae3e9c6432d69888aa324 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Sat, 1 Mar 2025 14:23:23 +0100 Subject: [PATCH 19/33] Update tests --- docker/benchmark_tests/run_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index c9bcc0e3..db859506 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -284,7 +284,7 @@ def main(test_type_nr): case 3: maximum_throughput_test = MaximumThroughputTest( - length_in_min=2, + length_in_min=5, ) maximum_throughput_test.execute() @@ -306,4 +306,4 @@ def main(test_type_nr): 3 - Maximum throughput test 4 - Long-term test """ - main(1) + main(3) From d6c434b01f1594515b9fb1012003e489134acca3 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Sun, 2 Mar 2025 09:31:08 +0100 Subject: [PATCH 20/33] Update log_volumes.json --- .../dashboards/log_volumes.json | 124 +++++++++--------- 1 file changed, 62 insertions(+), 62 deletions(-) diff --git a/docker/grafana-provisioning/dashboards/log_volumes.json b/docker/grafana-provisioning/dashboards/log_volumes.json index 1db3b3d2..70ba1c06 100644 --- a/docker/grafana-provisioning/dashboards/log_volumes.json +++ b/docker/grafana-provisioning/dashboards/log_volumes.json @@ -115,8 +115,8 @@ ] }, "gridPos": { - "h": 8, - "w": 9, + "h": 10, + "w": 12, "x": 0, "y": 0 }, @@ -299,9 +299,9 @@ ] }, "gridPos": { - "h": 8, + "h": 7, "w": 7, - "x": 9, + "x": 12, "y": 0 }, "id": 73, @@ -378,6 +378,36 @@ "title": "Entering and processed log lines per ${Granularity}", "type": "barchart" }, + { + "datasource": { + "type": "datasource", + "uid": "grafana" + }, + "gridPos": { + "h": 7, + "w": 5, + "x": 19, + "y": 0 + }, + "id": 74, + "options": { + "folderUID": "", + "includeVars": true, + "keepTime": true, + "maxItems": 10, + "query": "", + "showFolderNames": false, + "showHeadings": false, + "showRecentlyViewed": false, + "showSearch": true, + "showStarred": false, + "tags": [] + }, + "pluginVersion": "11.2.2+security-01", + "title": "Dashboards", + "transparent": true, + "type": "dashlist" + }, { "datasource": { "default": false, @@ -405,17 +435,17 @@ "overrides": [] }, "gridPos": { - "h": 5, - "w": 3, - "x": 16, - "y": 0 + "h": 3, + "w": 7, + "x": 12, + "y": 7 }, "id": 20, "options": { "colorMode": "value", "graphMode": "none", "justifyMode": "auto", - "orientation": "horizontal", + "orientation": "vertical", "percentChangeColorMode": "standard", "reduceOptions": { "calcs": [ @@ -453,7 +483,7 @@ "refId": "A" } ], - "title": "Max. number of entries per Batch", + "title": "Maximum fill level for Batches", "transformations": [ { "id": "rowsToFields", @@ -477,43 +507,13 @@ ], "type": "stat" }, - { - "datasource": { - "type": "datasource", - "uid": "grafana" - }, - "gridPos": { - "h": 7, - "w": 5, - "x": 19, - "y": 0 - }, - "id": 74, - "options": { - "folderUID": "", - "includeVars": true, - "keepTime": true, - "maxItems": 10, - "query": "", - "showFolderNames": false, - "showHeadings": false, - "showRecentlyViewed": false, - "showSearch": true, - "showStarred": false, - "tags": [] - }, - "pluginVersion": "11.2.2+security-01", - "title": "Dashboards", - "transparent": true, - "type": "dashlist" - }, { "collapsed": false, "gridPos": { "h": 1, "w": 24, "x": 0, - "y": 8 + "y": 10 }, "id": 72, "panels": [], @@ -604,7 +604,7 @@ "h": 9, "w": 15, "x": 0, - "y": 9 + "y": 11 }, "id": 55, "options": { @@ -792,7 +792,7 @@ "h": 9, "w": 9, "x": 15, - "y": 9 + "y": 11 }, "id": 47, "options": { @@ -869,7 +869,7 @@ "h": 1, "w": 24, "x": 0, - "y": 18 + "y": 20 }, "id": 59, "panels": [], @@ -938,7 +938,7 @@ "h": 6, "w": 6, "x": 0, - "y": 19 + "y": 21 }, "id": 50, "options": { @@ -1013,7 +1013,7 @@ "h": 3, "w": 5, "x": 6, - "y": 19 + "y": 21 }, "id": 60, "options": { @@ -1130,7 +1130,7 @@ "h": 6, "w": 6, "x": 12, - "y": 19 + "y": 21 }, "id": 52, "options": { @@ -1205,7 +1205,7 @@ "h": 3, "w": 5, "x": 18, - "y": 19 + "y": 21 }, "id": 64, "options": { @@ -1300,7 +1300,7 @@ "h": 3, "w": 5, "x": 6, - "y": 22 + "y": 24 }, "id": 61, "options": { @@ -1394,7 +1394,7 @@ "h": 3, "w": 5, "x": 18, - "y": 22 + "y": 24 }, "id": 67, "options": { @@ -1510,7 +1510,7 @@ "h": 6, "w": 6, "x": 0, - "y": 25 + "y": 27 }, "id": 51, "options": { @@ -1608,7 +1608,7 @@ "h": 3, "w": 3, "x": 6, - "y": 25 + "y": 27 }, "id": 62, "options": { @@ -1688,7 +1688,7 @@ "h": 3, "w": 2, "x": 9, - "y": 25 + "y": 27 }, "id": 68, "options": { @@ -1805,7 +1805,7 @@ "h": 6, "w": 6, "x": 12, - "y": 25 + "y": 27 }, "id": 53, "options": { @@ -1880,7 +1880,7 @@ "h": 3, "w": 5, "x": 18, - "y": 25 + "y": 27 }, "id": 66, "options": { @@ -1975,7 +1975,7 @@ "h": 3, "w": 5, "x": 6, - "y": 28 + "y": 30 }, "id": 63, "options": { @@ -2092,7 +2092,7 @@ "h": 3, "w": 5, "x": 18, - "y": 28 + "y": 30 }, "id": 65, "options": { @@ -2208,7 +2208,7 @@ "h": 6, "w": 6, "x": 12, - "y": 31 + "y": 33 }, "id": 54, "options": { @@ -2283,7 +2283,7 @@ "h": 3, "w": 5, "x": 18, - "y": 31 + "y": 33 }, "id": 70, "options": { @@ -2378,7 +2378,7 @@ "h": 3, "w": 5, "x": 18, - "y": 34 + "y": 36 }, "id": 69, "options": { @@ -2432,7 +2432,7 @@ "h": 1, "w": 24, "x": 0, - "y": 37 + "y": 39 }, "id": 76, "panels": [], @@ -2449,7 +2449,7 @@ "h": 7, "w": 24, "x": 0, - "y": 38 + "y": 40 }, "id": 75, "options": { From 6d88c9612d8a87a1918cbe4238534935efb45f3d Mon Sep 17 00:00:00 2001 From: Manuel F Date: Tue, 11 Mar 2025 16:24:47 +0100 Subject: [PATCH 21/33] Update log_volumes.json --- .../dashboards/log_volumes.json | 108 +++++++++--------- 1 file changed, 54 insertions(+), 54 deletions(-) diff --git a/docker/grafana-provisioning/dashboards/log_volumes.json b/docker/grafana-provisioning/dashboards/log_volumes.json index 70ba1c06..84fb7f3d 100644 --- a/docker/grafana-provisioning/dashboards/log_volumes.json +++ b/docker/grafana-provisioning/dashboards/log_volumes.json @@ -116,7 +116,7 @@ }, "gridPos": { "h": 10, - "w": 12, + "w": 10, "x": 0, "y": 0 }, @@ -299,9 +299,9 @@ ] }, "gridPos": { - "h": 7, - "w": 7, - "x": 12, + "h": 10, + "w": 9, + "x": 10, "y": 0 }, "id": 73, @@ -436,8 +436,8 @@ }, "gridPos": { "h": 3, - "w": 7, - "x": 12, + "w": 5, + "x": 19, "y": 7 }, "id": 20, @@ -483,7 +483,7 @@ "refId": "A" } ], - "title": "Maximum fill level for Batches", + "title": "Maximum Batch fill levels", "transformations": [ { "id": "rowsToFields", @@ -736,7 +736,7 @@ "refId": "BatchHandler" } ], - "title": "Log Volume Combined", + "title": "Log volume combined", "type": "timeseries" }, { @@ -896,8 +896,8 @@ "barAlignment": 0, "barWidthFactor": 0.6, "drawStyle": "line", - "fillOpacity": 11, - "gradientMode": "none", + "fillOpacity": 27, + "gradientMode": "opacity", "hideFrom": { "legend": false, "tooltip": false, @@ -905,6 +905,9 @@ }, "insertNulls": false, "lineInterpolation": "stepAfter", + "lineStyle": { + "fill": "solid" + }, "lineWidth": 1, "pointSize": 5, "scaleDistribution": { @@ -920,7 +923,6 @@ "mode": "off" } }, - "fieldMinMax": false, "mappings": [], "thresholds": { "mode": "absolute", @@ -928,6 +930,10 @@ { "color": "green", "value": null + }, + { + "color": "red", + "value": 80 } ] } @@ -940,13 +946,13 @@ "x": 0, "y": 21 }, - "id": 50, + "id": 51, "options": { "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", - "showLegend": false + "showLegend": true }, "tooltip": { "mode": "single", @@ -974,11 +980,34 @@ }, "pluginVersion": "4.5.1", "queryType": "table", - "rawSql": "SELECT *\nFROM fill_levels\nWHERE stage = 'log_collection.collector'\n AND entry_type = 'total_loglines'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", - "refId": "A" + "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_batches'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", + "refId": "Batch" + }, + { + "datasource": { + "type": "grafana-clickhouse-datasource", + "uid": "PDEE91DDB90597936" + }, + "editorType": "sql", + "format": 1, + "hide": false, + "meta": { + "builderOptions": { + "columns": [], + "database": "", + "limit": 1000, + "mode": "list", + "queryType": "table", + "table": "" + } + }, + "pluginVersion": "4.5.1", + "queryType": "table", + "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_buffer'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", + "refId": "Buffer" } ], - "title": "Collector Fill State", + "title": "BatchHandler Fill State", "type": "timeseries" }, { @@ -1462,8 +1491,8 @@ "barAlignment": 0, "barWidthFactor": 0.6, "drawStyle": "line", - "fillOpacity": 27, - "gradientMode": "opacity", + "fillOpacity": 11, + "gradientMode": "none", "hideFrom": { "legend": false, "tooltip": false, @@ -1471,9 +1500,6 @@ }, "insertNulls": false, "lineInterpolation": "stepAfter", - "lineStyle": { - "fill": "solid" - }, "lineWidth": 1, "pointSize": 5, "scaleDistribution": { @@ -1489,6 +1515,7 @@ "mode": "off" } }, + "fieldMinMax": false, "mappings": [], "thresholds": { "mode": "absolute", @@ -1496,10 +1523,6 @@ { "color": "green", "value": null - }, - { - "color": "red", - "value": 80 } ] } @@ -1512,13 +1535,13 @@ "x": 0, "y": 27 }, - "id": 51, + "id": 50, "options": { "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", - "showLegend": true + "showLegend": false }, "tooltip": { "mode": "single", @@ -1546,34 +1569,11 @@ }, "pluginVersion": "4.5.1", "queryType": "table", - "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_batches'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", - "refId": "Batch" - }, - { - "datasource": { - "type": "grafana-clickhouse-datasource", - "uid": "PDEE91DDB90597936" - }, - "editorType": "sql", - "format": 1, - "hide": false, - "meta": { - "builderOptions": { - "columns": [], - "database": "", - "limit": 1000, - "mode": "list", - "queryType": "table", - "table": "" - } - }, - "pluginVersion": "4.5.1", - "queryType": "table", - "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_buffer'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", - "refId": "Buffer" + "rawSql": "SELECT *\nFROM fill_levels\nWHERE stage = 'log_collection.collector'\n AND entry_type = 'total_loglines'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", + "refId": "A" } ], - "title": "BatchHandler Fill State", + "title": "Collector Fill State", "type": "timeseries" }, { @@ -2549,7 +2549,7 @@ "type": "nodeGraph" } ], - "refresh": "5s", + "refresh": "auto", "schemaVersion": 39, "tags": [], "templating": { From 92184c313c273e1c8e6fc38b4d4efa92be76d4be Mon Sep 17 00:00:00 2001 From: Manuel F Date: Fri, 14 Mar 2025 09:07:20 +0100 Subject: [PATCH 22/33] Update log_volumes.json --- docker/grafana-provisioning/dashboards/log_volumes.json | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docker/grafana-provisioning/dashboards/log_volumes.json b/docker/grafana-provisioning/dashboards/log_volumes.json index 84fb7f3d..005f65b5 100644 --- a/docker/grafana-provisioning/dashboards/log_volumes.json +++ b/docker/grafana-provisioning/dashboards/log_volumes.json @@ -346,7 +346,7 @@ "table": "" } }, - "pluginVersion": "4.6.0", + "pluginVersion": "4.8.2", "queryType": "table", "rawSql": "SELECT \n toStartOf${Granularity}(toDateTime64(timestamp_in, 6)) AS time_bucket,\n count(*) AS \" \"\nFROM server_logs\nWHERE timestamp_in >= toStartOf${Granularity}(toDateTime64($__fromTime, 6)) AND timestamp_in <= toStartOf${Granularity}(toDateTime64($__toTime, 6))\nGROUP BY time_bucket\nORDER BY time_bucket\nWITH FILL\nFROM toStartOf${Granularity}(toDateTime64($__fromTime, 6))\nTO toStartOf${Granularity}(toDateTime64($__toTime, 6))\nSTEP toInterval${Granularity}(1);", "refId": "Entering" @@ -369,9 +369,9 @@ "table": "" } }, - "pluginVersion": "4.7.0", + "pluginVersion": "4.8.2", "queryType": "table", - "rawSql": "SELECT time_bucket, -sum(number) as \"(negative)\"\nFROM (\n SELECT \n toStartOf${Granularity}(toDateTime64(timestamp_failed, 6)) AS time_bucket,\n count(DISTINCT message_text) AS number\n FROM failed_dns_loglines\n WHERE timestamp_failed >= toStartOf${Granularity}(toDateTime64($__fromTime, 6)) AND timestamp_failed <= toStartOf${Granularity}(toDateTime64($__toTime, 6))\n GROUP BY time_bucket\n ORDER BY time_bucket\n WITH FILL\n FROM toStartOf${Granularity}(toDateTime64($__fromTime, 6))\n TO toStartOf${Granularity}(toDateTime64($__toTime, 6))\n STEP toInterval${Granularity}(1)\n\n UNION ALL\n\n SELECT \n toStartOf${Granularity}(toDateTime64(timestamp, 6)) AS time_bucket,\n count(DISTINCT logline_id) AS number\n FROM logline_timestamps\n WHERE is_active = False\n AND timestamp >= toStartOf${Granularity}(toDateTime64($__fromTime, 6)) AND timestamp <= toStartOf${Granularity}(toDateTime64($__toTime, 6))\n GROUP BY time_bucket\n ORDER BY time_bucket\n WITH FILL\n FROM toStartOf${Granularity}(toDateTime64($__fromTime, 6))\n TO toStartOf${Granularity}(toDateTime64($__toTime, 6))\n STEP toInterval${Granularity}(1)\n)\nGROUP BY time_bucket;", + "rawSql": "SELECT time_bucket, -sum(count) AS \" (negative)\"\nFROM (\n SELECT toStartOf${Granularity}(timestamp) AS time_bucket, count(DISTINCT logline_id) AS count\n FROM (\n SELECT logline_id, timestamp\n FROM (\n SELECT logline_id, timestamp, ROW_NUMBER() OVER (PARTITION BY logline_id ORDER BY timestamp DESC) AS rn\n FROM logline_timestamps\n WHERE is_active = false\n )\n WHERE\n rn = 1\n )\n GROUP BY time_bucket\n\n UNION ALL\n\n SELECT toStartOf${Granularity}(timestamp_failed) AS time_bucket, count(DISTINCT message_text) AS count\n FROM failed_dns_loglines\n GROUP BY time_bucket\n)\nGROUP BY time_bucket\nORDER BY time_bucket;", "refId": "Processed" } ], @@ -2549,7 +2549,7 @@ "type": "nodeGraph" } ], - "refresh": "auto", + "refresh": "5s", "schemaVersion": 39, "tags": [], "templating": { From e378d22e8521a0139a2059619a0c976d9cc634a9 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Fri, 14 Mar 2025 09:35:56 +0100 Subject: [PATCH 23/33] Update log_volumes.json --- docker/grafana-provisioning/dashboards/log_volumes.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/grafana-provisioning/dashboards/log_volumes.json b/docker/grafana-provisioning/dashboards/log_volumes.json index 005f65b5..25911986 100644 --- a/docker/grafana-provisioning/dashboards/log_volumes.json +++ b/docker/grafana-provisioning/dashboards/log_volumes.json @@ -547,7 +547,7 @@ "tooltip": false, "viz": false }, - "insertNulls": false, + "insertNulls": 60000, "lineInterpolation": "stepAfter", "lineWidth": 1, "pointSize": 5, From 3aeafe315931cea64355c9c1d405809465103b50 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Fri, 14 Mar 2025 09:39:21 +0100 Subject: [PATCH 24/33] Update log_volumes.json --- docker/grafana-provisioning/dashboards/log_volumes.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/grafana-provisioning/dashboards/log_volumes.json b/docker/grafana-provisioning/dashboards/log_volumes.json index 25911986..4912f46a 100644 --- a/docker/grafana-provisioning/dashboards/log_volumes.json +++ b/docker/grafana-provisioning/dashboards/log_volumes.json @@ -371,7 +371,7 @@ }, "pluginVersion": "4.8.2", "queryType": "table", - "rawSql": "SELECT time_bucket, -sum(count) AS \" (negative)\"\nFROM (\n SELECT toStartOf${Granularity}(timestamp) AS time_bucket, count(DISTINCT logline_id) AS count\n FROM (\n SELECT logline_id, timestamp\n FROM (\n SELECT logline_id, timestamp, ROW_NUMBER() OVER (PARTITION BY logline_id ORDER BY timestamp DESC) AS rn\n FROM logline_timestamps\n WHERE is_active = false\n )\n WHERE\n rn = 1\n )\n GROUP BY time_bucket\n\n UNION ALL\n\n SELECT toStartOf${Granularity}(timestamp_failed) AS time_bucket, count(DISTINCT message_text) AS count\n FROM failed_dns_loglines\n GROUP BY time_bucket\n)\nGROUP BY time_bucket\nORDER BY time_bucket;", + "rawSql": "SELECT time_bucket, -sum(count) AS \" (negative)\"\nFROM (\n SELECT toStartOf${Granularity}(timestamp) AS time_bucket, count(DISTINCT logline_id) AS count\n FROM (\n SELECT logline_id, timestamp\n FROM (\n SELECT logline_id, timestamp, ROW_NUMBER() OVER (PARTITION BY logline_id ORDER BY timestamp DESC) AS rn\n FROM logline_timestamps\n WHERE is_active = false\n )\n WHERE rn = 1\n AND timestamp >= toStartOf${Granularity}(toDateTime64($__fromTime, 6)) AND timestamp <= toStartOf${Granularity}(toDateTime64($__toTime, 6))\n )\n GROUP BY time_bucket\n\n UNION ALL\n\n SELECT toStartOf${Granularity}(timestamp_failed) AS time_bucket, count(DISTINCT message_text) AS count\n FROM failed_dns_loglines\n WHERE timestamp_failed >= toStartOf${Granularity}(toDateTime64($__fromTime, 6)) AND timestamp_failed <= toStartOf${Granularity}(toDateTime64($__toTime, 6))\n GROUP BY time_bucket\n)\nGROUP BY time_bucket\nORDER BY time_bucket\nWITH FILL\nFROM toStartOf${Granularity}(toDateTime64($__fromTime, 6))\nTO toStartOf${Granularity}(toDateTime64($__toTime, 6))\nSTEP toInterval${Granularity}(1);", "refId": "Processed" } ], From 78d826213d70b2990188e7bcd9ce69dbb988c5f3 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Sat, 22 Mar 2025 16:55:50 +0100 Subject: [PATCH 25/33] Fix bug when inserting alerts --- src/monitoring/clickhouse_batch_sender.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/monitoring/clickhouse_batch_sender.py b/src/monitoring/clickhouse_batch_sender.py index ed27b356..c8435c81 100644 --- a/src/monitoring/clickhouse_batch_sender.py +++ b/src/monitoring/clickhouse_batch_sender.py @@ -150,8 +150,8 @@ def __init__(self): "alerts", { "client_ip": str, - "alert_timestamp": datetime.datetime, "suspicious_batch_id": uuid.UUID, + "alert_timestamp": datetime.datetime, "overall_score": float, "domain_names": str, "result": str, From 904298168d9d84c17d145ea1b9eb00ecf63a8bde Mon Sep 17 00:00:00 2001 From: Manuel F Date: Sat, 22 Mar 2025 20:37:58 +0100 Subject: [PATCH 26/33] Update log_volumes.json --- .../dashboards/log_volumes.json | 94 +++++++++---------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/docker/grafana-provisioning/dashboards/log_volumes.json b/docker/grafana-provisioning/dashboards/log_volumes.json index 4912f46a..fbaa7300 100644 --- a/docker/grafana-provisioning/dashboards/log_volumes.json +++ b/docker/grafana-provisioning/dashboards/log_volumes.json @@ -896,8 +896,8 @@ "barAlignment": 0, "barWidthFactor": 0.6, "drawStyle": "line", - "fillOpacity": 27, - "gradientMode": "opacity", + "fillOpacity": 11, + "gradientMode": "none", "hideFrom": { "legend": false, "tooltip": false, @@ -905,9 +905,6 @@ }, "insertNulls": false, "lineInterpolation": "stepAfter", - "lineStyle": { - "fill": "solid" - }, "lineWidth": 1, "pointSize": 5, "scaleDistribution": { @@ -923,6 +920,7 @@ "mode": "off" } }, + "fieldMinMax": false, "mappings": [], "thresholds": { "mode": "absolute", @@ -930,10 +928,6 @@ { "color": "green", "value": null - }, - { - "color": "red", - "value": 80 } ] } @@ -946,13 +940,13 @@ "x": 0, "y": 21 }, - "id": 51, + "id": 50, "options": { "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", - "showLegend": true + "showLegend": false }, "tooltip": { "mode": "single", @@ -980,34 +974,11 @@ }, "pluginVersion": "4.5.1", "queryType": "table", - "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_batches'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", - "refId": "Batch" - }, - { - "datasource": { - "type": "grafana-clickhouse-datasource", - "uid": "PDEE91DDB90597936" - }, - "editorType": "sql", - "format": 1, - "hide": false, - "meta": { - "builderOptions": { - "columns": [], - "database": "", - "limit": 1000, - "mode": "list", - "queryType": "table", - "table": "" - } - }, - "pluginVersion": "4.5.1", - "queryType": "table", - "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_buffer'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", - "refId": "Buffer" + "rawSql": "SELECT *\nFROM fill_levels\nWHERE stage = 'log_collection.collector'\n AND entry_type = 'total_loglines'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", + "refId": "A" } ], - "title": "BatchHandler Fill State", + "title": "Collector Fill State", "type": "timeseries" }, { @@ -1491,8 +1462,8 @@ "barAlignment": 0, "barWidthFactor": 0.6, "drawStyle": "line", - "fillOpacity": 11, - "gradientMode": "none", + "fillOpacity": 27, + "gradientMode": "opacity", "hideFrom": { "legend": false, "tooltip": false, @@ -1500,6 +1471,9 @@ }, "insertNulls": false, "lineInterpolation": "stepAfter", + "lineStyle": { + "fill": "solid" + }, "lineWidth": 1, "pointSize": 5, "scaleDistribution": { @@ -1515,7 +1489,6 @@ "mode": "off" } }, - "fieldMinMax": false, "mappings": [], "thresholds": { "mode": "absolute", @@ -1523,6 +1496,10 @@ { "color": "green", "value": null + }, + { + "color": "red", + "value": 80 } ] } @@ -1535,13 +1512,13 @@ "x": 0, "y": 27 }, - "id": 50, + "id": 51, "options": { "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", - "showLegend": false + "showLegend": true }, "tooltip": { "mode": "single", @@ -1569,11 +1546,34 @@ }, "pluginVersion": "4.5.1", "queryType": "table", - "rawSql": "SELECT *\nFROM fill_levels\nWHERE stage = 'log_collection.collector'\n AND entry_type = 'total_loglines'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", - "refId": "A" + "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_batches'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", + "refId": "Batch" + }, + { + "datasource": { + "type": "grafana-clickhouse-datasource", + "uid": "PDEE91DDB90597936" + }, + "editorType": "sql", + "format": 1, + "hide": false, + "meta": { + "builderOptions": { + "columns": [], + "database": "", + "limit": 1000, + "mode": "list", + "queryType": "table", + "table": "" + } + }, + "pluginVersion": "4.5.1", + "queryType": "table", + "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_buffer'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", + "refId": "Buffer" } ], - "title": "Collector Fill State", + "title": "BatchHandler Fill State", "type": "timeseries" }, { @@ -2549,7 +2549,7 @@ "type": "nodeGraph" } ], - "refresh": "5s", + "refresh": "auto", "schemaVersion": 39, "tags": [], "templating": { @@ -2579,7 +2579,7 @@ }, { "current": { - "selected": false, + "selected": true, "text": [ "All" ], From 5d7d0d7bcd1008a67ea62f72340d2b91c6fff134 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 2 Apr 2025 15:18:44 +0200 Subject: [PATCH 27/33] Update test_clickhouse_batch_sender.py --- .../test_clickhouse_batch_sender.py | 203 ++++++------------ 1 file changed, 70 insertions(+), 133 deletions(-) diff --git a/tests/clickhouse/test_clickhouse_batch_sender.py b/tests/clickhouse/test_clickhouse_batch_sender.py index f152cefd..51ee226f 100644 --- a/tests/clickhouse/test_clickhouse_batch_sender.py +++ b/tests/clickhouse/test_clickhouse_batch_sender.py @@ -1,188 +1,125 @@ import unittest from unittest.mock import patch, Mock -from src.monitoring.clickhouse_batch_sender import ClickHouseBatchSender +from src.monitoring.clickhouse_batch_sender import ClickHouseBatchSender, Table class TestInit(unittest.TestCase): - @patch("src.monitoring.clickhouse_batch_sender.BATCH_SIZE", 50) - @patch("src.monitoring.clickhouse_batch_sender.BATCH_TIMEOUT", 0.5) - @patch("src.monitoring.clickhouse_batch_sender.CLICKHOUSE_HOSTNAME", "test_name") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_init(self, mock_clickhouse_connect): - # Arrange - table_name = "test_table_name" - column_names = ["col_1", "col_2"] + def test_successful(self): # Act - sut = ClickHouseBatchSender(table_name, column_names) + with ( + patch("src.monitoring.clickhouse_batch_sender.BATCH_SIZE", 50), + patch("src.monitoring.clickhouse_batch_sender.BATCH_TIMEOUT", 0.5), + patch( + "src.monitoring.clickhouse_batch_sender.CLICKHOUSE_HOSTNAME", + "test_name", + ), + patch( + "src.monitoring.clickhouse_batch_sender.clickhouse_connect" + ) as mock_clickhouse_connect, + ): + sut = ClickHouseBatchSender() # Assert - self.assertEqual(table_name, sut.table_name) - self.assertEqual(column_names, sut.column_names) + self.assertIsNotNone(sut.tables) self.assertEqual(50, sut.max_batch_size) self.assertEqual(0.5, sut.batch_timeout) self.assertIsNone(sut.timer) - self.assertEqual([], sut.batch) + self.assertIsNotNone(sut.lock) + self.assertEqual({key: [] for key in sut.tables}, sut.batch) mock_clickhouse_connect.get_client.assert_called_once_with(host="test_name") class TestDel(unittest.TestCase): - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_del(self, mock_clickhouse_connect): - # Arrange - table_name = "test_table_name" - column_names = ["col_1", "col_2"] - sut = ClickHouseBatchSender(table_name, column_names) + def setUp(self): + with patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect"): + self.sut = ClickHouseBatchSender() + + def test_del(self): # Act with patch( "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert_all" ) as mock_insert_all: - del sut + del self.sut # Assert mock_insert_all.assert_called_once() class TestAdd(unittest.TestCase): - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert_all") - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_add_list_of_str_successful( - self, mock_clickhouse_connect, mock_start_timer, mock_insert_all - ): - # Arrange - table_name = "test_table_name" - column_names = ["col_1", "col_2"] - sut = ClickHouseBatchSender(table_name, column_names) - data = ["entry_1", "entry_2"] + def setUp(self): + with patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect"): + self.sut = ClickHouseBatchSender() - # Act - sut.add(data) - - # Assert - self.assertEqual([["entry_1", "entry_2"]], sut.batch) - - mock_insert_all.assert_not_called() - mock_start_timer.assert_called_once() - - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert_all") - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_add_timer_already_started( - self, mock_clickhouse_connect, mock_start_timer, mock_insert_all - ): + def test_single_list_with_starting_timer(self): # Arrange - table_name = "test_table_name" - column_names = ["col_1", "col_2"] - sut = ClickHouseBatchSender(table_name, column_names) + test_table_name = "test_table" + test_data = {"value_1": 1, "value_2": 2} - data = ["entry_1", "entry_2"] - sut.timer = Mock() + self.sut.tables = {test_table_name: Table(test_table_name, {})} + self.sut.batch = {test_table_name: []} # Act - sut.add(data) + with ( + patch("src.monitoring.clickhouse_batch_sender.Table.verify"), + patch( + "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer" + ) as mock_start_timer, + ): + self.sut.add(test_table_name, test_data) # Assert - self.assertEqual([["entry_1", "entry_2"]], sut.batch) - - mock_insert_all.assert_not_called() - mock_start_timer.assert_not_called() - - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert_all") - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_add_max_size_reached_and_timer_already_started( - self, mock_clickhouse_connect, mock_start_timer, mock_insert_all - ): - # Arrange - table_name = "test_table_name" - column_names = ["col_1", "col_2"] - sut = ClickHouseBatchSender(table_name, column_names) - - data = ["entry_1", "entry_2"] - sut.timer = Mock() - sut.max_batch_size = 1 - - # Act - sut.add(data) - - # Assert - self.assertEqual([["entry_1", "entry_2"]], sut.batch) - - mock_insert_all.assert_called_once() - mock_start_timer.assert_not_called() + self.sut.batch = {test_table_name: [1, 2]} + mock_start_timer.assert_called_once() - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert_all") - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_add_list_of_str_wrong_field_number( - self, mock_clickhouse_connect, mock_start_timer, mock_insert_all - ): + def test_timer_already_started(self): # Arrange - table_name = "test_table_name" - column_names = ["col_1"] - sut = ClickHouseBatchSender(table_name, column_names) + test_table_name = "test_table" + test_data = {"value_1": 1, "value_2": 2} - data = ["entry_1", "entry_2"] + self.sut.tables = {test_table_name: Table(test_table_name, {})} + self.sut.batch = {test_table_name: []} + self.sut.timer = Mock() # Act - with self.assertRaises(ValueError): - sut.add(data) + with ( + patch("src.monitoring.clickhouse_batch_sender.Table.verify"), + patch( + "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer" + ) as mock_start_timer, + ): + self.sut.add(test_table_name, test_data) # Assert - self.assertEqual([], sut.batch) - - mock_insert_all.assert_not_called() mock_start_timer.assert_not_called() - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert_all") - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_add_list_of_lists_successful( - self, mock_clickhouse_connect, mock_start_timer, mock_insert_all - ): + def test_max_batch_size_reached(self): # Arrange - table_name = "test_table_name" - column_names = ["col_1", "col_2"] - sut = ClickHouseBatchSender(table_name, column_names) + test_table_name = "test_table" + test_data = {"value_1": 1, "value_2": 2} - data = [["entry_1", "entry_2"], ["entry_3", "entry_4"]] + self.sut.tables = {test_table_name: Table(test_table_name, {})} + self.sut.batch = {test_table_name: []} + self.sut.max_batch_size = 1 # Act - sut.add(data) + with ( + patch("src.monitoring.clickhouse_batch_sender.Table.verify"), + patch( + "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert" + ) as mock_insert, + patch( + "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer" + ), + ): + self.sut.add(test_table_name, test_data) # Assert - self.assertEqual([["entry_1", "entry_2"], ["entry_3", "entry_4"]], sut.batch) - - mock_insert_all.assert_not_called() - mock_start_timer.assert_called_once() - - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert_all") - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_add_list_of_lists_wrong_field_number( - self, mock_clickhouse_connect, mock_start_timer, mock_insert_all - ): - # Arrange - table_name = "test_table_name" - column_names = ["col_1"] - sut = ClickHouseBatchSender(table_name, column_names) - - data = [["entry_1", "entry_2"], ["entry_3"]] - - # Act - with self.assertRaises(ValueError): - sut.add(data) - - # Assert - self.assertEqual([], sut.batch) - - mock_insert_all.assert_not_called() - mock_start_timer.assert_not_called() + mock_insert.assert_called_once_with(test_table_name) class TestInsertAll(unittest.TestCase): From 3370338e71ef65fb7deb566e5f78023ca800cad4 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Wed, 2 Apr 2025 16:17:42 +0200 Subject: [PATCH 28/33] Update test_clickhouse_batch_sender.py --- src/monitoring/clickhouse_batch_sender.py | 4 +- .../test_clickhouse_batch_sender.py | 92 ++++++++++++------- 2 files changed, 59 insertions(+), 37 deletions(-) diff --git a/src/monitoring/clickhouse_batch_sender.py b/src/monitoring/clickhouse_batch_sender.py index c8435c81..f01134df 100644 --- a/src/monitoring/clickhouse_batch_sender.py +++ b/src/monitoring/clickhouse_batch_sender.py @@ -207,8 +207,8 @@ def insert(self, table_name: str): def insert_all(self): """Inserts the batch for every table.""" - for e in self.batch: - self.insert(e) + for table in self.batch: + self.insert(table) if self.timer: self.timer.cancel() diff --git a/tests/clickhouse/test_clickhouse_batch_sender.py b/tests/clickhouse/test_clickhouse_batch_sender.py index 51ee226f..8c4dc498 100644 --- a/tests/clickhouse/test_clickhouse_batch_sender.py +++ b/tests/clickhouse/test_clickhouse_batch_sender.py @@ -5,7 +5,6 @@ class TestInit(unittest.TestCase): - def test_successful(self): # Act with ( @@ -33,7 +32,6 @@ def test_successful(self): class TestDel(unittest.TestCase): - def setUp(self): with patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect"): self.sut = ClickHouseBatchSender() @@ -50,7 +48,6 @@ def test_del(self): class TestAdd(unittest.TestCase): - def setUp(self): with patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect"): self.sut = ClickHouseBatchSender() @@ -122,51 +119,76 @@ def test_max_batch_size_reached(self): mock_insert.assert_called_once_with(test_table_name) -class TestInsertAll(unittest.TestCase): - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_insert_all(self, mock_clickhouse_connect): +class TestInsert(unittest.TestCase): + def setUp(self): + with patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect"): + self.sut = ClickHouseBatchSender() + + def test_filled_batch(self): # Arrange - table_name = "test_table_name" - column_names = ["col_1", "col_2"] - sut = ClickHouseBatchSender(table_name, column_names) - sut._client = Mock() - sut.batch = [["entry_1", "entry_2"], ["entry_3", "entry_4"]] + test_table_name = "test_table" + + self.sut.tables = { + test_table_name: Table(test_table_name, {"col_1": str, "col_2": str}) + } + self.sut.batch = {test_table_name: ["one", "two", "three"]} + self.sut._client = Mock() # Act - sut.insert_all() + self.sut.insert(test_table_name) # Assert - self.assertEqual([], sut.batch) - self.assertIsNone(sut.timer) - - sut._client.insert.assert_called_once_with( - table_name, - [["entry_1", "entry_2"], ["entry_3", "entry_4"]], - column_names=column_names, + self.sut._client.insert.assert_called_once_with( + test_table_name, + ["one", "two", "three"], + column_names=["col_1", "col_2"], ) + self.assertEquals([], self.sut.batch[test_table_name]) - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_insert_all_with_timer(self, mock_clickhouse_connect): + def test_empty_batch(self): # Arrange - table_name = "test_table_name" - column_names = ["col_1", "col_2"] - sut = ClickHouseBatchSender(table_name, column_names) - sut._client = Mock() - sut.timer = Mock() - sut.batch = [["entry_1", "entry_2"]] + test_table_name = "test_table" + + self.sut.tables = { + test_table_name: Table(test_table_name, {"col_1": str, "col_2": str}) + } + self.sut.batch = {test_table_name: []} + self.sut._client = Mock() # Act - sut.insert_all() + self.sut.insert(test_table_name) # Assert - self.assertEqual([], sut.batch) - self.assertIsNone(sut.timer) + self.sut._client.insert.assert_not_called() + self.assertEquals([], self.sut.batch[test_table_name]) - sut._client.insert.assert_called_once_with( - table_name, - [["entry_1", "entry_2"]], - column_names=column_names, - ) + +class TestInsertAll(unittest.TestCase): + def setUp(self): + with patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect"): + self.sut = ClickHouseBatchSender() + + def test_successful(self): + # Arrange + test_table_name_1 = "test_table_1" + test_table_name_2 = "test_table_2" + + self.sut.tables = { + test_table_name_1: Table(test_table_name_1, {}), + test_table_name_2: Table(test_table_name_2, {}), + } + self.sut.batch = {test_table_name_1: [1, 2, 3], test_table_name_2: [4, 5]} + self.sut.timer = Mock() + + # Act + with patch( + "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert" + ) as mock_insert: + self.sut.insert_all() + + # Assert + mock_insert.assert_any_call(test_table_name_1) + mock_insert.assert_any_call(test_table_name_2) class TestStartTimer(unittest.TestCase): From c9e50b86da5bb017d06a6e270b7cc8be07b9a1e7 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Thu, 3 Apr 2025 12:04:48 +0200 Subject: [PATCH 29/33] Update test_clickhouse_batch_sender.py --- .../test_clickhouse_batch_sender.py | 61 ++++++++++--------- 1 file changed, 32 insertions(+), 29 deletions(-) diff --git a/tests/clickhouse/test_clickhouse_batch_sender.py b/tests/clickhouse/test_clickhouse_batch_sender.py index 8c4dc498..fe973ada 100644 --- a/tests/clickhouse/test_clickhouse_batch_sender.py +++ b/tests/clickhouse/test_clickhouse_batch_sender.py @@ -192,43 +192,46 @@ def test_successful(self): class TestStartTimer(unittest.TestCase): - @patch("src.monitoring.clickhouse_batch_sender.BATCH_TIMEOUT", 0.5) - @patch("src.monitoring.clickhouse_batch_sender.Timer") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_start_timer(self, mock_clickhouse_connect, mock_timer): + def setUp(self): + with patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect"): + self.sut = ClickHouseBatchSender() + + def test_without_existing_timer(self): # Arrange - table_name = "test_table_name" - column_names = ["col_1", "col_2"] - sut = ClickHouseBatchSender(table_name, column_names) + self.sut.timer = None # Act - sut._start_timer() + with ( + patch("src.monitoring.clickhouse_batch_sender.Timer") as mock_timer, + patch("src.monitoring.clickhouse_batch_sender.BATCH_TIMEOUT", 5), + patch( + "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert_all" + ) as mock_insert_all, + ): + self.sut._start_timer() # Assert - mock_timer.assert_called_once_with( - 0.5, - sut.insert_all, - ) - mock_timer.cancel.assert_not_called() - sut.timer.start.assert_called_once() + mock_timer.assert_called_once_with(5, mock_insert_all) + # noinspection PyUnresolvedReferences + self.sut.timer.start.assert_called_once() - @patch("src.monitoring.clickhouse_batch_sender.BATCH_TIMEOUT", 0.5) - @patch("src.monitoring.clickhouse_batch_sender.Timer") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_start_timer_with_running_timer(self, mock_clickhouse_connect, mock_timer): + def test_with_existing_timer(self): # Arrange - table_name = "test_table_name" - column_names = ["col_1", "col_2"] - sut = ClickHouseBatchSender(table_name, column_names) - sut.timer = mock_timer + self.sut.timer = Mock() # Act - sut._start_timer() + with ( + patch("src.monitoring.clickhouse_batch_sender.Timer") as mock_timer, + patch("src.monitoring.clickhouse_batch_sender.BATCH_TIMEOUT", 5), + patch( + "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert_all" + ) as mock_insert_all, + patch.object(self.sut.timer, "cancel") as mock_cancel, + ): + self.sut._start_timer() # Assert - mock_timer.assert_called_once_with( - 0.5, - sut.insert_all, - ) - mock_timer.cancel.assert_called_once() - sut.timer.start.assert_called_once() + mock_cancel.assert_called_once() + mock_timer.assert_called_once_with(5, mock_insert_all) + # noinspection PyUnresolvedReferences + self.sut.timer.start.assert_called_once() From 38ac137c2a745f8d2c3e6e833c1d1c24baac3251 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Thu, 3 Apr 2025 12:06:05 +0200 Subject: [PATCH 30/33] Remove test_clickhouse_connector.py due to deletion of clickhouse_connector.py --- tests/clickhouse/test_clickhouse_connector.py | 792 ------------------ 1 file changed, 792 deletions(-) delete mode 100644 tests/clickhouse/test_clickhouse_connector.py diff --git a/tests/clickhouse/test_clickhouse_connector.py b/tests/clickhouse/test_clickhouse_connector.py deleted file mode 100644 index 48e12bec..00000000 --- a/tests/clickhouse/test_clickhouse_connector.py +++ /dev/null @@ -1,792 +0,0 @@ -import json -import unittest -from unittest.mock import patch, MagicMock, mock_open - -from src.monitoring.clickhouse_connector import * - - -class TestClickHouseConnector(unittest.TestCase): - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_init(self, mock_clickhouse_batch_sender): - # Arrange - mock_clickhouse_batch_sender_instance = MagicMock() - mock_clickhouse_batch_sender.return_value = ( - mock_clickhouse_batch_sender_instance - ) - - table_name = "test_table" - column_names = ["col_1", "col_2", "col_3"] - - # Act - sut = ClickHouseConnector(table_name, column_names) - - # Assert - self.assertEqual(table_name, sut._table_name) - self.assertEqual(column_names, sut._column_names) - self.assertEqual(mock_clickhouse_batch_sender_instance, sut._batch_sender) - - mock_clickhouse_batch_sender.assert_called_once_with( - table_name=table_name, - column_names=column_names, - ) - - @patch("src.monitoring.clickhouse_connector.os.path.join") - @patch( - "src.monitoring.clickhouse_connector.open", - new_callable=mock_open, - read_data="CREATE TABLE test;", - ) - @patch("src.monitoring.clickhouse_connector.clickhouse_connect.get_client") - def test_prepare_table_success( - self, mock_get_client, mock_open_file, mock_path_join - ): - # Arrange - mock_client = MagicMock() - mock_get_client.return_value.__enter__.return_value = mock_client - mock_path_join.return_value = "/fake/path/test_table.sql" - - sut = ClickHouseConnector("test_table", ["col_1", "col_2", "col_3"]) - - # Act - sut.prepare_table() - - # Assert - mock_open_file.assert_called_once_with("/fake/path/test_table.sql", "r") - mock_client.command.assert_called_once_with("CREATE TABLE test;") - - @patch("src.monitoring.clickhouse_connector.os.path.join") - @patch( - "src.monitoring.clickhouse_connector.open", - new_callable=mock_open, - read_data="CREATE TABLE test;", - ) - @patch("src.monitoring.clickhouse_connector.clickhouse_connect.get_client") - @patch("src.monitoring.clickhouse_connector.logger") - def test_prepare_table_failure( - self, mock_logger, mock_get_client, mock_open_file, mock_path_join - ): - mock_client = MagicMock() - mock_get_client.return_value.__enter__.return_value = mock_client - mock_path_join.return_value = "/fake/path/test_table.sql" - mock_client.command.side_effect = Exception("Test exception") - - sut = ClickHouseConnector("test_table", ["col_1", "col_2", "col_3"]) - - with self.assertRaises(Exception): - sut.prepare_table() - - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_add_to_batch(self, mock_clickhouse_batch_sender): - # Arrange - mock_clickhouse_batch_sender_instance = MagicMock() - mock_clickhouse_batch_sender.return_value = ( - mock_clickhouse_batch_sender_instance - ) - - sut = ClickHouseConnector("test_table", ["col_1", "col_2", "col_3"]) - - # Act - sut._add_to_batch("test_data") - - # Assert - mock_clickhouse_batch_sender_instance.add.assert_called_once_with("test_data") - - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_insert(self, mock_clickhouse_batch_sender): - # Arrange - sut = ClickHouseConnector("test_table", ["col_1", "col_2", "col_3"]) - - # Act - sut.insert("test_data") - - -class TestServerLogsConnector(unittest.TestCase): - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_init(self, mock_clickhouse_batch_sender): - # Arrange - mock_clickhouse_batch_sender_instance = MagicMock() - mock_clickhouse_batch_sender.return_value = ( - mock_clickhouse_batch_sender_instance - ) - - expected_table_name = "server_logs" - expected_column_names = [ - "message_id", - "timestamp_in", - "message_text", - ] - - # Act - sut = ServerLogsConnector() - - # Assert - self.assertEqual(expected_table_name, sut._table_name) - self.assertEqual(expected_column_names, sut._column_names) - self.assertEqual(mock_clickhouse_batch_sender_instance, sut._batch_sender) - - mock_clickhouse_batch_sender.assert_called_once_with( - table_name=expected_table_name, - column_names=expected_column_names, - ) - - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_insert_all_given(self, mock_clickhouse_batch_sender): - # Arrange - message_text = "test_message_text" - message_id = uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff") - timestamp_in = datetime.datetime(2034, 12, 13, 12, 34, 12, 132412) - - sut = ServerLogsConnector() - - with patch.object(sut, "_add_to_batch", MagicMock()) as mock_add_to_batch: - # Act - sut.insert( - message_text=message_text, - message_id=message_id, - timestamp_in=timestamp_in, - ) - - # Assert - mock_add_to_batch.assert_called_once_with( - [ - uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff"), - datetime.datetime(2034, 12, 13, 12, 34, 12, 132412), - "test_message_text", - ] - ) - - -class TestServerLogsTimestampsConnector(unittest.TestCase): - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_init(self, mock_clickhouse_batch_sender): - # Arrange - mock_clickhouse_batch_sender_instance = MagicMock() - mock_clickhouse_batch_sender.return_value = ( - mock_clickhouse_batch_sender_instance - ) - - expected_table_name = "server_logs_timestamps" - expected_column_names = [ - "message_id", - "event", - "event_timestamp", - ] - - # Act - sut = ServerLogsTimestampsConnector() - - # Assert - self.assertEqual(expected_table_name, sut._table_name) - self.assertEqual(expected_column_names, sut._column_names) - self.assertEqual(mock_clickhouse_batch_sender_instance, sut._batch_sender) - - mock_clickhouse_batch_sender.assert_called_once_with( - table_name=expected_table_name, - column_names=expected_column_names, - ) - - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_insert_all_given(self, mock_clickhouse_batch_sender): - # Arrange - message_id = uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff") - event = "test_event" - event_timestamp = datetime.datetime(2034, 12, 13, 12, 34, 12, 132412) - - sut = ServerLogsTimestampsConnector() - - with patch.object(sut, "_add_to_batch", MagicMock()) as mock_add_to_batch: - # Act - sut.insert( - message_id=message_id, - event=event, - event_timestamp=event_timestamp, - ) - - # Assert - mock_add_to_batch.assert_called_once_with( - [ - uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff"), - "test_event", - datetime.datetime(2034, 12, 13, 12, 34, 12, 132412), - ] - ) - - -class TestFailedDNSLoglinesConnector(unittest.TestCase): - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_init(self, mock_clickhouse_batch_sender): - # Arrange - mock_clickhouse_batch_sender_instance = MagicMock() - mock_clickhouse_batch_sender.return_value = ( - mock_clickhouse_batch_sender_instance - ) - - expected_table_name = "failed_dns_loglines" - expected_column_names = [ - "message_text", - "timestamp_in", - "timestamp_failed", - "reason_for_failure", - ] - - # Act - sut = FailedDNSLoglinesConnector() - - # Assert - self.assertEqual(expected_table_name, sut._table_name) - self.assertEqual(expected_column_names, sut._column_names) - self.assertEqual(mock_clickhouse_batch_sender_instance, sut._batch_sender) - - mock_clickhouse_batch_sender.assert_called_once_with( - table_name=expected_table_name, - column_names=expected_column_names, - ) - - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_insert_all_given(self, mock_clickhouse_batch_sender): - # Arrange - message_text = "test_message_text" - timestamp_in = datetime.datetime(2034, 12, 13, 12, 34, 12, 132412) - timestamp_failed = datetime.datetime(2034, 12, 13, 12, 35, 35, 542635) - reason_for_failure = "Wrong client_ip field" - - sut = FailedDNSLoglinesConnector() - - with patch.object(sut, "_add_to_batch", MagicMock()) as mock_add_to_batch: - # Act - sut.insert( - message_text=message_text, - timestamp_in=timestamp_in, - timestamp_failed=timestamp_failed, - reason_for_failure=reason_for_failure, - ) - - # Assert - mock_add_to_batch.assert_called_once_with( - [ - "test_message_text", - datetime.datetime(2034, 12, 13, 12, 34, 12, 132412), - datetime.datetime(2034, 12, 13, 12, 35, 35, 542635), - "Wrong client_ip field", - ] - ) - - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_insert_none_given(self, mock_clickhouse_batch_sender): - # Arrange - message_text = "test_message_text" - timestamp_in = datetime.datetime(2034, 12, 13, 12, 34, 12, 132412) - timestamp_failed = datetime.datetime(2034, 12, 13, 12, 35, 35, 542635) - - sut = FailedDNSLoglinesConnector() - - with patch.object(sut, "_add_to_batch", MagicMock()) as mock_add_to_batch: - # Act - sut.insert( - message_text=message_text, - timestamp_in=datetime.datetime(2034, 12, 13, 12, 34, 12, 132412), - timestamp_failed=datetime.datetime(2034, 12, 13, 12, 35, 35, 542635), - reason_for_failure=None, - ) - - # Assert - mock_add_to_batch.assert_called_once() - - -class TestLoglineToBatchesConnector(unittest.TestCase): - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_init(self, mock_clickhouse_batch_sender): - # Arrange - mock_clickhouse_batch_sender_instance = MagicMock() - mock_clickhouse_batch_sender.return_value = ( - mock_clickhouse_batch_sender_instance - ) - - expected_table_name = "logline_to_batches" - expected_column_names = [ - "logline_id", - "batch_id", - ] - - # Act - sut = LoglineToBatchesConnector() - - # Assert - self.assertEqual(expected_table_name, sut._table_name) - self.assertEqual(expected_column_names, sut._column_names) - self.assertEqual(mock_clickhouse_batch_sender_instance, sut._batch_sender) - - mock_clickhouse_batch_sender.assert_called_once_with( - table_name=expected_table_name, - column_names=expected_column_names, - ) - - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_insert_all_given(self, mock_clickhouse_batch_sender): - # Arrange - logline_id = uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff") - batch_id = uuid.UUID("1f855c43-8a75-4b53-b6cd-4a13b89312d6") - - sut = LoglineToBatchesConnector() - - with patch.object(sut, "_add_to_batch", MagicMock()) as mock_add_to_batch: - # Act - sut.insert( - logline_id=logline_id, - batch_id=batch_id, - ) - - # Assert - mock_add_to_batch.assert_called_once_with( - [ - uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff"), - uuid.UUID("1f855c43-8a75-4b53-b6cd-4a13b89312d6"), - ] - ) - - -class TestDNSLoglinesConnector(unittest.TestCase): - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_init(self, mock_clickhouse_batch_sender): - # Arrange - mock_clickhouse_batch_sender_instance = MagicMock() - mock_clickhouse_batch_sender.return_value = ( - mock_clickhouse_batch_sender_instance - ) - - expected_table_name = "dns_loglines" - expected_column_names = [ - "logline_id", - "subnet_id", - "timestamp", - "status_code", - "client_ip", - "record_type", - "additional_fields", - ] - - # Act - sut = DNSLoglinesConnector() - - # Assert - self.assertEqual(expected_table_name, sut._table_name) - self.assertEqual(expected_column_names, sut._column_names) - self.assertEqual(mock_clickhouse_batch_sender_instance, sut._batch_sender) - - mock_clickhouse_batch_sender.assert_called_once_with( - table_name=expected_table_name, - column_names=expected_column_names, - ) - - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_insert_all_given(self, mock_clickhouse_batch_sender): - # Arrange - logline_id = uuid.UUID("d7add097-40a5-42f6-89df-1e7b20c4a4b8") - subnet_id = "127.0.0.0_24" - timestamp = datetime.datetime(2024, 12, 6, 13, 41, 53, 589594) - status_code = "NXDOMAIN" - client_ip = "127.0.0.1" - record_type = "A" - additional_fields = json.dumps(dict(test="some_field")) - - sut = DNSLoglinesConnector() - - with patch.object(sut, "_add_to_batch", MagicMock()) as mock_add_to_batch: - # Act - sut.insert( - logline_id=logline_id, - subnet_id=subnet_id, - timestamp=timestamp, - status_code=status_code, - client_ip=client_ip, - record_type=record_type, - additional_fields=additional_fields, - ) - - # Assert - mock_add_to_batch.assert_called_once() - - -class TestLoglineTimestampsConnector(unittest.TestCase): - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_init(self, mock_clickhouse_batch_sender): - # Arrange - mock_clickhouse_batch_sender_instance = MagicMock() - mock_clickhouse_batch_sender.return_value = ( - mock_clickhouse_batch_sender_instance - ) - - expected_table_name = "logline_timestamps" - expected_column_names = [ - "logline_id", - "stage", - "status", - "timestamp", - "is_active", - ] - - # Act - sut = LoglineTimestampsConnector() - - # Assert - self.assertEqual(expected_table_name, sut._table_name) - self.assertEqual(expected_column_names, sut._column_names) - self.assertEqual(mock_clickhouse_batch_sender_instance, sut._batch_sender) - - mock_clickhouse_batch_sender.assert_called_once_with( - table_name=expected_table_name, - column_names=expected_column_names, - ) - - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_insert_all_given(self, mock_clickhouse_batch_sender): - # Arrange - logline_id = uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff") - stage = "prefilter" - status = "prefilter_out" - timestamp = datetime.datetime(2034, 12, 13, 12, 35, 35, 542635) - - sut = LoglineTimestampsConnector() - - with patch.object(sut, "_add_to_batch", MagicMock()) as mock_add_to_batch: - # Act - sut.insert( - logline_id=logline_id, - stage=stage, - status=status, - timestamp=timestamp, - is_active=True, - ) - - # Assert - mock_add_to_batch.assert_called_once_with( - [ - uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff"), - "prefilter", - "prefilter_out", - datetime.datetime(2034, 12, 13, 12, 35, 35, 542635), - True, - ] - ) - - -class TestBatchTimestampsConnector(unittest.TestCase): - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_init(self, mock_clickhouse_batch_sender): - # Arrange - mock_clickhouse_batch_sender_instance = MagicMock() - mock_clickhouse_batch_sender.return_value = ( - mock_clickhouse_batch_sender_instance - ) - - expected_table_name = "batch_timestamps" - expected_column_names = [ - "batch_id", - "stage", - "status", - "timestamp", - "is_active", - "message_count", - ] - - # Act - sut = BatchTimestampsConnector() - - # Assert - self.assertEqual(expected_table_name, sut._table_name) - self.assertEqual(expected_column_names, sut._column_names) - self.assertEqual(mock_clickhouse_batch_sender_instance, sut._batch_sender) - - mock_clickhouse_batch_sender.assert_called_once_with( - table_name=expected_table_name, - column_names=expected_column_names, - ) - - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_insert_all_given(self, mock_clickhouse_batch_sender): - # Arrange - batch_id = uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff") - stage = "prefilter" - status = "prefilter_out" - timestamp = datetime.datetime(2034, 12, 13, 12, 35, 35, 542635) - message_count = 456 - - sut = BatchTimestampsConnector() - - with patch.object(sut, "_add_to_batch", MagicMock()) as mock_add_to_batch: - # Act - sut.insert( - batch_id=batch_id, - stage=stage, - status=status, - timestamp=timestamp, - is_active=True, - message_count=message_count, - ) - - # Assert - mock_add_to_batch.assert_called_once_with( - [ - uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff"), - "prefilter", - "prefilter_out", - datetime.datetime(2034, 12, 13, 12, 35, 35, 542635), - True, - 456, - ] - ) - - -class TestSuspiciousBatchesToBatchConnector(unittest.TestCase): - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_init(self, mock_clickhouse_batch_sender): - # Arrange - mock_clickhouse_batch_sender_instance = MagicMock() - mock_clickhouse_batch_sender.return_value = ( - mock_clickhouse_batch_sender_instance - ) - - expected_table_name = "suspicious_batches_to_batch" - expected_column_names = [ - "suspicious_batch_id", - "batch_id", - ] - - # Act - sut = SuspiciousBatchesToBatchConnector() - - # Assert - self.assertEqual(expected_table_name, sut._table_name) - self.assertEqual(expected_column_names, sut._column_names) - self.assertEqual(mock_clickhouse_batch_sender_instance, sut._batch_sender) - - mock_clickhouse_batch_sender.assert_called_once_with( - table_name=expected_table_name, - column_names=expected_column_names, - ) - - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_insert_all_given(self, mock_clickhouse_batch_sender): - # Arrange - suspicious_batch_id = uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff") - batch_id = uuid.UUID("1f855c43-8a75-4b53-b6cd-4a13b89312d6") - - sut = SuspiciousBatchesToBatchConnector() - - with patch.object(sut, "_add_to_batch", MagicMock()) as mock_add_to_batch: - # Act - sut.insert( - suspicious_batch_id=suspicious_batch_id, - batch_id=batch_id, - ) - - # Assert - mock_add_to_batch.assert_called_once_with( - [ - uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff"), - uuid.UUID("1f855c43-8a75-4b53-b6cd-4a13b89312d6"), - ] - ) - - -class TestSuspiciousBatchTimestampsConnector(unittest.TestCase): - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_init(self, mock_clickhouse_batch_sender): - # Arrange - mock_clickhouse_batch_sender_instance = MagicMock() - mock_clickhouse_batch_sender.return_value = ( - mock_clickhouse_batch_sender_instance - ) - - expected_table_name = "suspicious_batch_timestamps" - expected_column_names = [ - "suspicious_batch_id", - "client_ip", - "stage", - "status", - "timestamp", - "is_active", - "message_count", - ] - - # Act - sut = SuspiciousBatchTimestampsConnector() - - # Assert - self.assertEqual(expected_table_name, sut._table_name) - self.assertEqual(expected_column_names, sut._column_names) - self.assertEqual(mock_clickhouse_batch_sender_instance, sut._batch_sender) - - mock_clickhouse_batch_sender.assert_called_once_with( - table_name=expected_table_name, - column_names=expected_column_names, - ) - - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_insert_all_given(self, mock_clickhouse_batch_sender): - # Arrange - suspicious_batch_id = uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff") - client_ip = "127.0.0.1" - stage = "prefilter" - status = "prefilter_out" - timestamp = datetime.datetime(2034, 12, 13, 12, 35, 35, 542635) - message_count = 456 - - sut = SuspiciousBatchTimestampsConnector() - - with patch.object(sut, "_add_to_batch", MagicMock()) as mock_add_to_batch: - # Act - sut.insert( - suspicious_batch_id=suspicious_batch_id, - client_ip=client_ip, - stage=stage, - status=status, - timestamp=timestamp, - is_active=True, - message_count=message_count, - ) - - # Assert - mock_add_to_batch.assert_called_once_with( - [ - uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff"), - "127.0.0.1", - "prefilter", - "prefilter_out", - datetime.datetime(2034, 12, 13, 12, 35, 35, 542635), - True, - 456, - ] - ) - - -class TestAlertsConnector(unittest.TestCase): - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_init(self, mock_clickhouse_batch_sender): - # Arrange - mock_clickhouse_batch_sender_instance = MagicMock() - mock_clickhouse_batch_sender.return_value = ( - mock_clickhouse_batch_sender_instance - ) - - expected_table_name = "alerts" - expected_column_names = [ - "client_ip", - "alert_timestamp", - "suspicious_batch_id", - "overall_score", - "domain_names", - "result", - ] - - # Act - sut = AlertsConnector() - - # Assert - self.assertEqual(expected_table_name, sut._table_name) - self.assertEqual(expected_column_names, sut._column_names) - self.assertEqual(mock_clickhouse_batch_sender_instance, sut._batch_sender) - - mock_clickhouse_batch_sender.assert_called_once_with( - table_name=expected_table_name, - column_names=expected_column_names, - ) - - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_insert_all_given(self, mock_clickhouse_batch_sender): - # Arrange - client_ip = "127.0.0.1" - alert_timestamp = datetime.datetime(2034, 12, 13, 12, 35, 35, 542635) - suspicious_batch_id = uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff") - overall_score = 15.4 - domain_names = "random.de" - result = "test" - - sut = AlertsConnector() - - with patch.object(sut, "_add_to_batch", MagicMock()) as mock_add_to_batch: - # Act - sut.insert( - client_ip=client_ip, - alert_timestamp=alert_timestamp, - suspicious_batch_id=suspicious_batch_id, - overall_score=overall_score, - domain_names=domain_names, - result=result, - ) - - # Assert - mock_add_to_batch.assert_called_once_with( - [ - "127.0.0.1", - datetime.datetime(2034, 12, 13, 12, 35, 35, 542635), - uuid.UUID("7299539b-6215-4f6b-b39f-69335aafbeff"), - 15.4, - "random.de", - "test", - ] - ) - - -class TestFillLevelsConnector(unittest.TestCase): - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_init(self, mock_clickhouse_batch_sender): - # Arrange - mock_clickhouse_batch_sender_instance = MagicMock() - mock_clickhouse_batch_sender.return_value = ( - mock_clickhouse_batch_sender_instance - ) - - expected_table_name = "fill_levels" - expected_column_names = [ - "timestamp", - "stage", - "entry_type", - "entry_count", - ] - - # Act - sut = FillLevelsConnector() - - # Assert - self.assertEqual(expected_table_name, sut._table_name) - self.assertEqual(expected_column_names, sut._column_names) - self.assertEqual(mock_clickhouse_batch_sender_instance, sut._batch_sender) - - mock_clickhouse_batch_sender.assert_called_once_with( - table_name=expected_table_name, - column_names=expected_column_names, - ) - - @patch("src.monitoring.clickhouse_connector.ClickHouseBatchSender") - def test_insert_all_given(self, mock_clickhouse_batch_sender): - # Arrange - timestamp = datetime.datetime(2034, 12, 13, 12, 35, 35, 542635) - stage = "test_stage" - entry_type = "test_entry_type" - entry_count = 17 - - sut = FillLevelsConnector() - - with patch.object(sut, "_add_to_batch", MagicMock()) as mock_add_to_batch: - # Act - sut.insert( - timestamp=timestamp, - stage=stage, - entry_type=entry_type, - entry_count=entry_count, - ) - - # Assert - mock_add_to_batch.assert_called_once_with( - [ - datetime.datetime(2034, 12, 13, 12, 35, 35, 542635), - "test_stage", - "test_entry_type", - 17, - ] - ) - - -if __name__ == "__main__": - unittest.main() From 979f02df06ed545e66f5d6a7b761186dbe9e9793 Mon Sep 17 00:00:00 2001 From: Manuel F Date: Thu, 3 Apr 2025 12:45:00 +0200 Subject: [PATCH 31/33] Update test_monitoring_agent.py --- tests/miscellaneous/test_monitoring_agent.py | 153 +++++++------------ 1 file changed, 53 insertions(+), 100 deletions(-) diff --git a/tests/miscellaneous/test_monitoring_agent.py b/tests/miscellaneous/test_monitoring_agent.py index 9c096396..6943c24c 100644 --- a/tests/miscellaneous/test_monitoring_agent.py +++ b/tests/miscellaneous/test_monitoring_agent.py @@ -49,98 +49,43 @@ def test_prepare_all_tables_with_exception( class TestInit(unittest.TestCase): - @patch("src.monitoring.monitoring_agent.ServerLogsConnector") - @patch("src.monitoring.monitoring_agent.ServerLogsTimestampsConnector") - @patch("src.monitoring.monitoring_agent.FailedDNSLoglinesConnector") - @patch("src.monitoring.monitoring_agent.LoglineToBatchesConnector") - @patch("src.monitoring.monitoring_agent.DNSLoglinesConnector") - @patch("src.monitoring.monitoring_agent.LoglineTimestampsConnector") - @patch("src.monitoring.monitoring_agent.BatchTimestampsConnector") - @patch("src.monitoring.monitoring_agent.SuspiciousBatchesToBatchConnector") - @patch("src.monitoring.monitoring_agent.SuspiciousBatchTimestampsConnector") - @patch("src.monitoring.monitoring_agent.AlertsConnector") - @patch("src.monitoring.monitoring_agent.FillLevelsConnector") - @patch("src.monitoring.monitoring_agent.SimpleKafkaConsumeHandler") - def test_init( - self, - mock_kafka_consumer, - mock_fill_levels, - mock_alerts, - mock_suspicious_batch_timestamps, - mock_suspicious_batches_to_batch, - mock_batch_timestamps, - mock_logline_timestamps, - mock_dns_loglines, - mock_logline_to_batches, - mock_failed_dns_loglines, - mock_server_logs_timestamps, - mock_server_logs, - ): - # Arrange - expected_topics = [ - "clickhouse_server_logs", - "clickhouse_server_logs_timestamps", - "clickhouse_failed_dns_loglines", - "clickhouse_logline_to_batches", - "clickhouse_dns_loglines", - "clickhouse_logline_timestamps", - "clickhouse_batch_timestamps", - "clickhouse_suspicious_batches_to_batch", - "clickhouse_suspicious_batch_timestamps", - "clickhouse_alerts", - "clickhouse_fill_levels", - ] - + def test_successful(self): # Act - sut = MonitoringAgent() + with ( + patch( + "src.monitoring.monitoring_agent.SimpleKafkaConsumeHandler" + ) as mock_simple_kafka_consume_handler, + patch( + "src.monitoring.monitoring_agent.ClickHouseBatchSender" + ) as mock_clickhouse_batch_sender, + ): + sut = MonitoringAgent() # Assert - self.assertEqual( - expected_topics, - sut.topics, + self.assertTrue( + isinstance(sut.table_names, list) + and all(isinstance(e, str) for e in sut.table_names) ) - mock_kafka_consumer.assert_called_once_with(expected_topics) + self.assertTrue(all(e.startswith("clickhouse_") for e in sut.topics)) + self.assertIsNotNone(sut.kafka_consumer) + self.assertIsNotNone(sut.batch_sender) + mock_simple_kafka_consume_handler.assert_called_once_with(sut.topics) + mock_clickhouse_batch_sender.assert_called_once() class TestStart(unittest.IsolatedAsyncioTestCase): - @patch("src.monitoring.monitoring_agent.ServerLogsConnector") - @patch("src.monitoring.monitoring_agent.ServerLogsTimestampsConnector") - @patch("src.monitoring.monitoring_agent.FailedDNSLoglinesConnector") - @patch("src.monitoring.monitoring_agent.LoglineToBatchesConnector") - @patch("src.monitoring.monitoring_agent.DNSLoglinesConnector") - @patch("src.monitoring.monitoring_agent.LoglineTimestampsConnector") - @patch("src.monitoring.monitoring_agent.BatchTimestampsConnector") - @patch("src.monitoring.monitoring_agent.SuspiciousBatchesToBatchConnector") - @patch("src.monitoring.monitoring_agent.SuspiciousBatchTimestampsConnector") - @patch("src.monitoring.monitoring_agent.AlertsConnector") - @patch("src.monitoring.monitoring_agent.FillLevelsConnector") - @patch("src.monitoring.monitoring_agent.logger") - @patch("src.monitoring.monitoring_agent.SimpleKafkaConsumeHandler") - @patch("asyncio.get_running_loop") - async def test_handle_kafka_inputs( - self, - mock_get_running_loop, - mock_kafka_consume, - mock_logger, - mock_fill_levels, - mock_alerts, - mock_suspicious_batch_timestamps, - mock_suspicious_batches_to_batch, - mock_batch_timestamps, - mock_logline_timestamps, - mock_dns_loglines, - mock_logline_to_batches, - mock_failed_dns_loglines, - mock_server_logs_timestamps, - mock_server_logs, - ): + def setUp(self): + with ( + patch("src.monitoring.monitoring_agent.SimpleKafkaConsumeHandler"), + patch("src.monitoring.monitoring_agent.ClickHouseBatchSender"), + ): + self.sut = MonitoringAgent() + + async def test_successful(self): # Arrange - sut = MonitoringAgent() - sut.connectors["server_logs"] = Mock() - data_schema = marshmallow_dataclass.class_schema(ServerLogs)() - fixed_id = uuid.uuid4() - timestamp_in = datetime.datetime.now() + fixed_id = uuid.UUID("35871c8c-ff72-44ad-a9b7-4f02cf92d484") + timestamp_in = datetime.datetime(2025, 4, 3, 12, 32, 7, 264410) value = data_schema.dumps( { "message_id": fixed_id, @@ -148,26 +93,34 @@ async def test_handle_kafka_inputs( "message_text": "test_text", } ) - - mock_loop = AsyncMock() - mock_get_running_loop.return_value = mock_loop - sut.kafka_consumer.consume.return_value = ( - "key1", + self.sut.kafka_consumer.consume.return_value = ( + "test_key", value, "clickhouse_server_logs", ) - mock_loop.run_in_executor.side_effect = [ - ("key1", value, "clickhouse_server_logs"), - KeyboardInterrupt(), - ] - - # Act and Assert - await sut.start() - - sut.connectors["server_logs"].insert.assert_called_once_with( - message_id=fixed_id, - timestamp_in=timestamp_in, - message_text="test_text", + + with patch( + "src.monitoring.monitoring_agent.asyncio.get_running_loop" + ) as mock_get_running_loop: + mock_loop = AsyncMock() + mock_get_running_loop.return_value = mock_loop + + mock_loop.run_in_executor.side_effect = [ + ("test_key", value, "clickhouse_server_logs"), + KeyboardInterrupt(), + ] + + # Act + await self.sut.start() + + # Assert + self.sut.batch_sender.add.assert_called_once_with( + "server_logs", + { + "message_id": fixed_id, + "timestamp_in": timestamp_in, + "message_text": "test_text", + }, ) From 00773179299469fa89b9dd9308df418d92b0bcd2 Mon Sep 17 00:00:00 2001 From: "Manuel F." <94921205+lamr02n@users.noreply.github.com> Date: Thu, 3 Apr 2025 14:08:23 +0200 Subject: [PATCH 32/33] Update formatting.yml --- .github/workflows/formatting.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/formatting.yml b/.github/workflows/formatting.yml index 7551980e..43da14d5 100644 --- a/.github/workflows/formatting.yml +++ b/.github/workflows/formatting.yml @@ -29,7 +29,7 @@ jobs: python -m pip install -r requirements/requirements.dev.txt -r requirements/requirements.detector.txt -r requirements/requirements.logcollector.txt -r requirements/requirements.prefilter.txt -r requirements/requirements.inspector.txt -r requirements/requirements.logserver.txt - name: Cache pre-commit hooks - uses: actions/cache@v4.0.2 + uses: actions/cache@v4.0.3 with: path: ~/.cache/pre-commit key: ${{ runner.os }}-precommit-${{ hashFiles('.pre-commit-config.yaml') }} From 97cdf074f8c4d5d52c8dd4bb8fdd74cdd28242f6 Mon Sep 17 00:00:00 2001 From: "Manuel F." <94921205+lamr02n@users.noreply.github.com> Date: Thu, 3 Apr 2025 14:12:42 +0200 Subject: [PATCH 33/33] Update formatting.yml --- .github/workflows/formatting.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/formatting.yml b/.github/workflows/formatting.yml index 43da14d5..90cc4e62 100644 --- a/.github/workflows/formatting.yml +++ b/.github/workflows/formatting.yml @@ -29,7 +29,7 @@ jobs: python -m pip install -r requirements/requirements.dev.txt -r requirements/requirements.detector.txt -r requirements/requirements.logcollector.txt -r requirements/requirements.prefilter.txt -r requirements/requirements.inspector.txt -r requirements/requirements.logserver.txt - name: Cache pre-commit hooks - uses: actions/cache@v4.0.3 + uses: actions/cache@v4 with: path: ~/.cache/pre-commit key: ${{ runner.os }}-precommit-${{ hashFiles('.pre-commit-config.yaml') }}