diff --git a/.github/workflows/formatting.yml b/.github/workflows/formatting.yml index 7551980e..90cc4e62 100644 --- a/.github/workflows/formatting.yml +++ b/.github/workflows/formatting.yml @@ -29,7 +29,7 @@ jobs: python -m pip install -r requirements/requirements.dev.txt -r requirements/requirements.detector.txt -r requirements/requirements.logcollector.txt -r requirements/requirements.prefilter.txt -r requirements/requirements.inspector.txt -r requirements/requirements.logserver.txt - name: Cache pre-commit hooks - uses: actions/cache@v4.0.2 + uses: actions/cache@v4 with: path: ~/.cache/pre-commit key: ${{ runner.os }}-precommit-${{ hashFiles('.pre-commit-config.yaml') }} diff --git a/docker/benchmark_tests/run_test.py b/docker/benchmark_tests/run_test.py index 3f6ab4ab..db859506 100644 --- a/docker/benchmark_tests/run_test.py +++ b/docker/benchmark_tests/run_test.py @@ -157,7 +157,11 @@ def _execute_one_interval( cur_index += 1 except KafkaError: logger.warning(KafkaError) - time.sleep(1.0 / msg_per_sec) + + if msg_per_sec > 0: + time.sleep(1.0 / msg_per_sec) + else: + time.sleep(1.0) logger.warning(f"Finish interval with {msg_per_sec} msg/s") return cur_index @@ -254,7 +258,7 @@ def execute(self): class MaximumThroughputTest(LongTermTest): """Keeps a consistent rate that is too high to be handled.""" - def __init__(self, length_in_min: float | int, msg_per_sec: int = 10000): + def __init__(self, length_in_min: float | int, msg_per_sec: int = 500): super().__init__(full_length_in_min=length_in_min, msg_per_sec=msg_per_sec) @@ -263,16 +267,16 @@ def main(test_type_nr): match test_type_nr: case 1: ramp_up_test = RampUpTest( - msg_per_sec_in_intervals=[1, 10, 50, 100, 150, 200], - interval_length_in_sec=[30, 30, 30, 30, 30, 30], + msg_per_sec_in_intervals=[10, 50, 100, 150], + interval_length_in_sec=[120, 120, 120, 120], ) ramp_up_test.execute() case 2: burst_test = BurstTest( - normal_rate_msg_per_sec=20, - burst_rate_msg_per_sec=10000, - normal_rate_interval_length=10, + normal_rate_msg_per_sec=50, + burst_rate_msg_per_sec=1000, + normal_rate_interval_length=120, burst_rate_interval_length=2, number_of_intervals=3, ) @@ -280,7 +284,7 @@ def main(test_type_nr): case 3: maximum_throughput_test = MaximumThroughputTest( - length_in_min=1, + length_in_min=5, ) maximum_throughput_test.execute() @@ -302,4 +306,4 @@ def main(test_type_nr): 3 - Maximum throughput test 4 - Long-term test """ - main(1) + main(3) diff --git a/docker/grafana-provisioning/dashboards/log_volumes.json b/docker/grafana-provisioning/dashboards/log_volumes.json index abbf228c..fbaa7300 100644 --- a/docker/grafana-provisioning/dashboards/log_volumes.json +++ b/docker/grafana-provisioning/dashboards/log_volumes.json @@ -115,8 +115,8 @@ ] }, "gridPos": { - "h": 8, - "w": 9, + "h": 10, + "w": 10, "x": 0, "y": 0 }, @@ -299,9 +299,9 @@ ] }, "gridPos": { - "h": 8, - "w": 7, - "x": 9, + "h": 10, + "w": 9, + "x": 10, "y": 0 }, "id": 73, @@ -346,7 +346,7 @@ "table": "" } }, - "pluginVersion": "4.6.0", + "pluginVersion": "4.8.2", "queryType": "table", "rawSql": "SELECT \n toStartOf${Granularity}(toDateTime64(timestamp_in, 6)) AS time_bucket,\n count(*) AS \" \"\nFROM server_logs\nWHERE timestamp_in >= toStartOf${Granularity}(toDateTime64($__fromTime, 6)) AND timestamp_in <= toStartOf${Granularity}(toDateTime64($__toTime, 6))\nGROUP BY time_bucket\nORDER BY time_bucket\nWITH FILL\nFROM toStartOf${Granularity}(toDateTime64($__fromTime, 6))\nTO toStartOf${Granularity}(toDateTime64($__toTime, 6))\nSTEP toInterval${Granularity}(1);", "refId": "Entering" @@ -369,15 +369,45 @@ "table": "" } }, - "pluginVersion": "4.7.0", + "pluginVersion": "4.8.2", "queryType": "table", - "rawSql": "SELECT time_bucket, -sum(number) as \"(negative)\"\nFROM (\n SELECT \n toStartOf${Granularity}(toDateTime64(timestamp_failed, 6)) AS time_bucket,\n count(DISTINCT message_text) AS number\n FROM failed_dns_loglines\n WHERE timestamp_failed >= toStartOf${Granularity}(toDateTime64($__fromTime, 6)) AND timestamp_failed <= toStartOf${Granularity}(toDateTime64($__toTime, 6))\n GROUP BY time_bucket\n ORDER BY time_bucket\n WITH FILL\n FROM toStartOf${Granularity}(toDateTime64($__fromTime, 6))\n TO toStartOf${Granularity}(toDateTime64($__toTime, 6))\n STEP toInterval${Granularity}(1)\n\n UNION ALL\n\n SELECT \n toStartOf${Granularity}(toDateTime64(timestamp, 6)) AS time_bucket,\n count(DISTINCT logline_id) AS number\n FROM logline_timestamps\n WHERE is_active = False\n AND timestamp >= toStartOf${Granularity}(toDateTime64($__fromTime, 6)) AND timestamp <= toStartOf${Granularity}(toDateTime64($__toTime, 6))\n GROUP BY time_bucket\n ORDER BY time_bucket\n WITH FILL\n FROM toStartOf${Granularity}(toDateTime64($__fromTime, 6))\n TO toStartOf${Granularity}(toDateTime64($__toTime, 6))\n STEP toInterval${Granularity}(1)\n)\nGROUP BY time_bucket;", + "rawSql": "SELECT time_bucket, -sum(count) AS \" (negative)\"\nFROM (\n SELECT toStartOf${Granularity}(timestamp) AS time_bucket, count(DISTINCT logline_id) AS count\n FROM (\n SELECT logline_id, timestamp\n FROM (\n SELECT logline_id, timestamp, ROW_NUMBER() OVER (PARTITION BY logline_id ORDER BY timestamp DESC) AS rn\n FROM logline_timestamps\n WHERE is_active = false\n )\n WHERE rn = 1\n AND timestamp >= toStartOf${Granularity}(toDateTime64($__fromTime, 6)) AND timestamp <= toStartOf${Granularity}(toDateTime64($__toTime, 6))\n )\n GROUP BY time_bucket\n\n UNION ALL\n\n SELECT toStartOf${Granularity}(timestamp_failed) AS time_bucket, count(DISTINCT message_text) AS count\n FROM failed_dns_loglines\n WHERE timestamp_failed >= toStartOf${Granularity}(toDateTime64($__fromTime, 6)) AND timestamp_failed <= toStartOf${Granularity}(toDateTime64($__toTime, 6))\n GROUP BY time_bucket\n)\nGROUP BY time_bucket\nORDER BY time_bucket\nWITH FILL\nFROM toStartOf${Granularity}(toDateTime64($__fromTime, 6))\nTO toStartOf${Granularity}(toDateTime64($__toTime, 6))\nSTEP toInterval${Granularity}(1);", "refId": "Processed" } ], "title": "Entering and processed log lines per ${Granularity}", "type": "barchart" }, + { + "datasource": { + "type": "datasource", + "uid": "grafana" + }, + "gridPos": { + "h": 7, + "w": 5, + "x": 19, + "y": 0 + }, + "id": 74, + "options": { + "folderUID": "", + "includeVars": true, + "keepTime": true, + "maxItems": 10, + "query": "", + "showFolderNames": false, + "showHeadings": false, + "showRecentlyViewed": false, + "showSearch": true, + "showStarred": false, + "tags": [] + }, + "pluginVersion": "11.2.2+security-01", + "title": "Dashboards", + "transparent": true, + "type": "dashlist" + }, { "datasource": { "default": false, @@ -405,17 +435,17 @@ "overrides": [] }, "gridPos": { - "h": 5, - "w": 3, - "x": 16, - "y": 0 + "h": 3, + "w": 5, + "x": 19, + "y": 7 }, "id": 20, "options": { "colorMode": "value", "graphMode": "none", "justifyMode": "auto", - "orientation": "horizontal", + "orientation": "vertical", "percentChangeColorMode": "standard", "reduceOptions": { "calcs": [ @@ -453,7 +483,7 @@ "refId": "A" } ], - "title": "Max. number of entries per Batch", + "title": "Maximum Batch fill levels", "transformations": [ { "id": "rowsToFields", @@ -477,43 +507,13 @@ ], "type": "stat" }, - { - "datasource": { - "type": "datasource", - "uid": "grafana" - }, - "gridPos": { - "h": 7, - "w": 5, - "x": 19, - "y": 0 - }, - "id": 74, - "options": { - "folderUID": "", - "includeVars": true, - "keepTime": true, - "maxItems": 10, - "query": "", - "showFolderNames": false, - "showHeadings": false, - "showRecentlyViewed": false, - "showSearch": true, - "showStarred": false, - "tags": [] - }, - "pluginVersion": "11.2.2+security-01", - "title": "Dashboards", - "transparent": true, - "type": "dashlist" - }, { "collapsed": false, "gridPos": { "h": 1, "w": 24, "x": 0, - "y": 8 + "y": 10 }, "id": 72, "panels": [], @@ -547,7 +547,7 @@ "tooltip": false, "viz": false }, - "insertNulls": false, + "insertNulls": 60000, "lineInterpolation": "stepAfter", "lineWidth": 1, "pointSize": 5, @@ -604,7 +604,7 @@ "h": 9, "w": 15, "x": 0, - "y": 9 + "y": 11 }, "id": 55, "options": { @@ -736,7 +736,7 @@ "refId": "BatchHandler" } ], - "title": "Log Volume Combined", + "title": "Log volume combined", "type": "timeseries" }, { @@ -792,7 +792,7 @@ "h": 9, "w": 9, "x": 15, - "y": 9 + "y": 11 }, "id": 47, "options": { @@ -869,7 +869,7 @@ "h": 1, "w": 24, "x": 0, - "y": 18 + "y": 20 }, "id": 59, "panels": [], @@ -896,8 +896,8 @@ "barAlignment": 0, "barWidthFactor": 0.6, "drawStyle": "line", - "fillOpacity": 27, - "gradientMode": "opacity", + "fillOpacity": 11, + "gradientMode": "none", "hideFrom": { "legend": false, "tooltip": false, @@ -905,9 +905,6 @@ }, "insertNulls": false, "lineInterpolation": "stepAfter", - "lineStyle": { - "fill": "solid" - }, "lineWidth": 1, "pointSize": 5, "scaleDistribution": { @@ -923,6 +920,7 @@ "mode": "off" } }, + "fieldMinMax": false, "mappings": [], "thresholds": { "mode": "absolute", @@ -930,10 +928,6 @@ { "color": "green", "value": null - }, - { - "color": "red", - "value": 80 } ] } @@ -944,15 +938,15 @@ "h": 6, "w": 6, "x": 0, - "y": 19 + "y": 21 }, - "id": 51, + "id": 50, "options": { "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", - "showLegend": true + "showLegend": false }, "tooltip": { "mode": "single", @@ -980,34 +974,11 @@ }, "pluginVersion": "4.5.1", "queryType": "table", - "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_batches'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", - "refId": "Batch" - }, - { - "datasource": { - "type": "grafana-clickhouse-datasource", - "uid": "PDEE91DDB90597936" - }, - "editorType": "sql", - "format": 1, - "hide": false, - "meta": { - "builderOptions": { - "columns": [], - "database": "", - "limit": 1000, - "mode": "list", - "queryType": "table", - "table": "" - } - }, - "pluginVersion": "4.5.1", - "queryType": "table", - "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_buffer'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", - "refId": "Buffer" + "rawSql": "SELECT *\nFROM fill_levels\nWHERE stage = 'log_collection.collector'\n AND entry_type = 'total_loglines'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", + "refId": "A" } ], - "title": "BatchHandler Fill State", + "title": "Collector Fill State", "type": "timeseries" }, { @@ -1042,7 +1013,7 @@ "h": 3, "w": 5, "x": 6, - "y": 19 + "y": 21 }, "id": 60, "options": { @@ -1159,7 +1130,7 @@ "h": 6, "w": 6, "x": 12, - "y": 19 + "y": 21 }, "id": 52, "options": { @@ -1234,7 +1205,7 @@ "h": 3, "w": 5, "x": 18, - "y": 19 + "y": 21 }, "id": 64, "options": { @@ -1329,7 +1300,7 @@ "h": 3, "w": 5, "x": 6, - "y": 22 + "y": 24 }, "id": 61, "options": { @@ -1423,7 +1394,7 @@ "h": 3, "w": 5, "x": 18, - "y": 22 + "y": 24 }, "id": 67, "options": { @@ -1491,8 +1462,8 @@ "barAlignment": 0, "barWidthFactor": 0.6, "drawStyle": "line", - "fillOpacity": 11, - "gradientMode": "none", + "fillOpacity": 27, + "gradientMode": "opacity", "hideFrom": { "legend": false, "tooltip": false, @@ -1500,6 +1471,9 @@ }, "insertNulls": false, "lineInterpolation": "stepAfter", + "lineStyle": { + "fill": "solid" + }, "lineWidth": 1, "pointSize": 5, "scaleDistribution": { @@ -1515,7 +1489,6 @@ "mode": "off" } }, - "fieldMinMax": false, "mappings": [], "thresholds": { "mode": "absolute", @@ -1523,6 +1496,10 @@ { "color": "green", "value": null + }, + { + "color": "red", + "value": 80 } ] } @@ -1533,15 +1510,15 @@ "h": 6, "w": 6, "x": 0, - "y": 25 + "y": 27 }, - "id": 50, + "id": 51, "options": { "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", - "showLegend": false + "showLegend": true }, "tooltip": { "mode": "single", @@ -1569,11 +1546,34 @@ }, "pluginVersion": "4.5.1", "queryType": "table", - "rawSql": "SELECT *\nFROM fill_levels\nWHERE stage = 'log_collection.collector'\n AND entry_type = 'total_loglines'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", - "refId": "A" + "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_batches'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", + "refId": "Batch" + }, + { + "datasource": { + "type": "grafana-clickhouse-datasource", + "uid": "PDEE91DDB90597936" + }, + "editorType": "sql", + "format": 1, + "hide": false, + "meta": { + "builderOptions": { + "columns": [], + "database": "", + "limit": 1000, + "mode": "list", + "queryType": "table", + "table": "" + } + }, + "pluginVersion": "4.5.1", + "queryType": "table", + "rawSql": "SELECT timestamp, entry_count AS \" \"\nFROM fill_levels\nWHERE stage = 'log_collection.batch_handler'\n AND entry_type = 'total_loglines_in_buffer'\n AND timestamp >= $__fromTime AND timestamp <= $__toTime\nORDER BY timestamp ASC;", + "refId": "Buffer" } ], - "title": "Collector Fill State", + "title": "BatchHandler Fill State", "type": "timeseries" }, { @@ -1608,7 +1608,7 @@ "h": 3, "w": 3, "x": 6, - "y": 25 + "y": 27 }, "id": 62, "options": { @@ -1688,7 +1688,7 @@ "h": 3, "w": 2, "x": 9, - "y": 25 + "y": 27 }, "id": 68, "options": { @@ -1805,7 +1805,7 @@ "h": 6, "w": 6, "x": 12, - "y": 25 + "y": 27 }, "id": 53, "options": { @@ -1880,7 +1880,7 @@ "h": 3, "w": 5, "x": 18, - "y": 25 + "y": 27 }, "id": 66, "options": { @@ -1975,7 +1975,7 @@ "h": 3, "w": 5, "x": 6, - "y": 28 + "y": 30 }, "id": 63, "options": { @@ -2092,7 +2092,7 @@ "h": 3, "w": 5, "x": 18, - "y": 28 + "y": 30 }, "id": 65, "options": { @@ -2208,7 +2208,7 @@ "h": 6, "w": 6, "x": 12, - "y": 31 + "y": 33 }, "id": 54, "options": { @@ -2283,7 +2283,7 @@ "h": 3, "w": 5, "x": 18, - "y": 31 + "y": 33 }, "id": 70, "options": { @@ -2350,7 +2350,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null } ] } @@ -2377,7 +2378,7 @@ "h": 3, "w": 5, "x": 18, - "y": 34 + "y": 36 }, "id": 69, "options": { @@ -2431,7 +2432,7 @@ "h": 1, "w": 24, "x": 0, - "y": 37 + "y": 39 }, "id": 76, "panels": [], @@ -2448,7 +2449,7 @@ "h": 7, "w": 24, "x": 0, - "y": 38 + "y": 40 }, "id": 75, "options": { @@ -2578,7 +2579,7 @@ }, { "current": { - "selected": false, + "selected": true, "text": [ "All" ], diff --git a/docs/api/index.rst b/docs/api/index.rst index a4f614fe..9460ce8f 100644 --- a/docs/api/index.rst +++ b/docs/api/index.rst @@ -10,6 +10,7 @@ API inspector logcollector logserver + monitoring prefilter train version diff --git a/docs/conf.py b/docs/conf.py index 0344676d..66d5d497 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -19,7 +19,7 @@ # -- Project information project = "heiDGAF" -copyright = "2024, Stefan Machmeier and Manuel Fuchs" +copyright = "2025, Stefan Machmeier and Manuel Fuchs" author = "Stefan Machmeier and Manuel Fuchs" # exec(open("../src/version.py").read()) diff --git a/docs/configuration.rst b/docs/configuration.rst index 5696a83c..9d0c22f2 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -1,3 +1,65 @@ +Logline format configuration +............................ + +Users can define the format and fields of their DNS server loglines. For this, change the +``pipeline.log_collection.collector.logline_format`` parameter: + +For example, a logline might look like this: + +.. code-block:: console + + 2025-04-04T14:45:32.458Z NXDOMAIN 192.168.3.152 10.10.0.3 test.com AAAA 192.168.15.34 196b + +Each list entry of the parameter defines one field of the input logline, and the order of the entries corresponds to the +order of the values in each logline. Each list entry itself consists of a list with +three or four entries: For example, a field definition might look like this: + +.. code-block:: console + + [ "status_code", ListItem, [ "NOERROR", "NXDOMAIN" ], [ "NXDOMAIN" ] ] + +The first entry always corresponds to the name of the field. Some field values must exist in the logline, as they are +used by the modules. Some field names are cannot be used, as they are defined for internal communication. + +.. list-table:: Required and forbidden field names + :header-rows: 0 + :widths: 15 50 + + * - Required + - ``timestamp``, ``status_code``, ``client_ip``, ``record_type``, ``domain_name`` + * - Forbidden + - ``logline_id``, ``batch_id`` + +The second entry specifies the type of the field. Depending on the type defined here, the method for defining the +possible values varies. The third and fourth entry change depending on the type. +Please check the following table for more information on the types. + +There are three types to choose from: + +.. list-table:: Field types + :header-rows: 1 + :widths: 20 20 20 30 + + * - Field type + - Format of 3rd entry + - Format of 4th entry + - Description + * - ``RegEx`` (Regular Expression) + - RegEx pattern as string + - + - The logline field is checked against the pattern. If the pattern is met, the field is valid. + * - ``ListItem`` + - List of values + - List of values (optional) + - If the logline field value is in the first list, it is valid. If it is also in the second list, it is relevant + for the inspection and detection algorithm. All values in the second list must also be in the first list, not + vice versa. If this entry is not specified, all values are deemed relevant. + * - ``IpAddress`` + - + - + - If the logline field value is an IPv4 or IPv6 address, it is valid. + + Logging Configuration ..................... @@ -33,17 +95,11 @@ functionality of the modules. * - Parameter - Default Value - Description - * - input_kafka_topic - - ``"LogServer"`` - - Kafka topic for incoming log lines. * - input_file - ``"/opt/file.txt"`` - - File for appending new log lines continuously. + - Path of the input file, to which data is appended during usage. - Keep this setting unchanged if using Docker; modify the ``MOUNT_PATH`` in ``docker/.env`` instead. - * - max_number_of_connections - - ``1000`` - - Maximum number of simultaneous connections for sending and receiving. + Keep this setting unchanged when using Docker; modify the ``MOUNT_PATH`` in ``docker/.env`` instead. ``pipeline.log_collection`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -55,7 +111,8 @@ functionality of the modules. * - Parameter - Description * - logline_format - - Defines the expected format for incoming log lines. See the TODO section for more details. + - Defines the expected format for incoming log lines. See the :ref:`Logline format configuration` section for more + details. .. list-table:: ``batch_handler`` Parameters :header-rows: 1 @@ -65,14 +122,18 @@ functionality of the modules. - Default Value - Description * - batch_size - - ``1000`` - - TODO + - ``10000`` + - Number of entries in a Batch, at which it is sent due to reaching the maximum fill state. * - batch_timeout - - ``20.0`` - - TODO - * - subnet.subnet_bits + - ``30.0`` + - Time after which a Batch is sent. Mainly relevant for Batches that only contain a small number of entries, and + do not reach the size limit for a longer time period. + * - subnet_id.ipv4_prefix_length - ``24`` - - The number of bits to trim from the client's IPv4 address for use as ``subnet_id``. + - The number of bits to trim from the client's IPv4 address for use as `Subnet ID`. + * - subnet_id.ipv6_prefix_length + - ``64`` + - The number of bits to trim from the client's IPv6 address for use as `Subnet ID`. ``pipeline.data_inspection`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -132,10 +193,10 @@ functionality of the modules. - Default Value - Description * - model - - ``xg`` + - ``rf`` - TODO * - checksum - - + - Not given here - TODO * - base_url - https://heibox.uni-heidelberg.de/d/0d5cbcbe16cd46a58021/ @@ -158,16 +219,13 @@ The following parameters control the infrastructure of the software. - Description * - timestamp_format - ``"%Y-%m-%dT%H:%M:%S.%fZ"`` - - TODO + - Timestamp format used by the Inspector. Will be removed soon. * - kafka_brokers - - TODO - - TODO - * - logserver.hostname - - ``172.27.0.8`` - - The hostname or IP address that the :class:`LogServer` will use to start and bind to the network interface. - * - logserver.port_in - - ``9998`` - - The port on which the :class:`LogServer` will listen for incoming log lines. - * - logserver.port_out - - ``9999`` - - The port on which the :class:`LogServer` is available for collecting instances. Any instance connecting to this port will receive the latest log line stored on the server. + - ``hostname: kafka1, port: 8097``, ``hostname: kafka2, port: 8098``, ``hostname: kafka3, port: 8099`` + - Hostnames and ports of the Kafka brokers, given as list. + * - kafka_topics + - Not given here + - Kafka topic names given as strings. These topics are used for the data transfer between the modules. + * - monitoring.clickhouse_server.hostname + - ``clickhouse-server`` + - Hostname of the ClickHouse server. Used by Grafana. diff --git a/docs/index.rst b/docs/index.rst index 047ea3cb..2e064478 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,17 +1,19 @@ Welcome to heiDGAF's documentation! =================================== -**heiDGAF** is a machine learning based DNS classifier for detecting Domain Generation Algorithms (DGAs), tunneling, and +**heiDGAF** is a machine learning-based DNS classifier for detecting Domain Generation Algorithms (DGAs), tunneling, and data exfiltration by malicious actors. Check out the :doc:`usage` section for further information on how to use the software, including how to :ref:`install ` and :ref:`configure ` the project. For more details on the implementation -and structure, take a look at the :doc:`pipeline` section. +and structure, take a look at the :doc:`pipeline` section. The :doc:`monitoring` section describes how to set up the +monitoring environment for observing the software's functionality in real-time. .. note:: This project is under active development. + Contents -------- @@ -20,6 +22,7 @@ Contents usage pipeline + monitoring training api/index references diff --git a/docs/media/monitoring_pipeline.png b/docs/media/monitoring_pipeline.png new file mode 100644 index 00000000..a5f95511 Binary files /dev/null and b/docs/media/monitoring_pipeline.png differ diff --git a/docs/media/pipeline_overview.pdf b/docs/media/pipeline_overview.pdf deleted file mode 100644 index d04038d9..00000000 Binary files a/docs/media/pipeline_overview.pdf and /dev/null differ diff --git a/docs/media/pipeline_overview.png b/docs/media/pipeline_overview.png index dea71b11..debd8988 100644 Binary files a/docs/media/pipeline_overview.png and b/docs/media/pipeline_overview.png differ diff --git a/docs/monitoring.rst b/docs/monitoring.rst new file mode 100644 index 00000000..638a215c --- /dev/null +++ b/docs/monitoring.rst @@ -0,0 +1,50 @@ +Monitoring +~~~~~~~~~~ + +.. note:: + + This page is under active development. + +Overview +======== + +The software includes a monitoring functionality that stores relevant information in a database (`ClickHouse`). The +collected data is then visualized using multiple `Grafana` dashboard views. + +.. image:: media/monitoring_pipeline.png + + +Setup +===== + +Normal mode +----------- + +Both, `ClickHouse` and `Grafana` can be executed as their own Docker container. All needed containers are started +when executing: + +.. code-block:: console + + $ docker compose -f docker/docker-compose.yml up + + +All modules send their monitoring-relevant information to Kafka, from which it is then collected by the +`Monitoring Agent` module. This module checks their validity and resumes by storing the values in `ClickHouse`. By the +default configuration defined in ``docker-compose.yml``, `Grafana` automatically loads the dashboard views and fills +them with the data in `ClickHouse`. The dashboard views can then be observed on ``localhost:3000`` (by default). + +`Datatest` mode +--------------- + +For users interested in testing their own machine learning models used by the detection algorithm in the `Data Analysis` +stage, the monitoring functionality can be started in the `datatest` mode: + +.. code-block:: console + + $ docker compose -f docker/docker-compose.datatests.yml up + +`Grafana` then shows one more dashboard view, `Datatests`, that shows the confusion matrix for a testing dataset. + +.. warning:: + + This feature is in an early development stage! diff --git a/docs/pipeline.rst b/docs/pipeline.rst index bde168ef..6d56c17f 100644 --- a/docs/pipeline.rst +++ b/docs/pipeline.rst @@ -1,38 +1,25 @@ Pipeline ~~~~~~~~ -.. note:: - - This page is under active development. - Overview ======== +The core component of the software's architecture is its data pipeline. It consists of five stages/modules, and data +traverses through it using Apache Kafka. + .. image:: media/pipeline_overview.png Stage 1: Log Storage ==================== -The primary goal of this stage is to temporarily store incoming loglines until they can be processed by subsequent -stages of the pipeline. This buffering approach ensures that incoming log data is not lost and can be processed -efficiently when the next stages are ready. - -Once the data is stored, it can be retrieved by the next stage of the pipeline, referred to as -:ref:`Stage 2: Log Collection`, which connects to the server. This design allows multiple -:ref:`Log Collection` instances to connect simultaneously, enabling load balancing and -efficient distribution of processing tasks. +This stage serves as the central contact point for all data. Overview -------- -The :class:`LogServer` class is the core component of this stage. It opens two types of network ports: - -1. **Incoming Port** (``port_in``): Used to receive loglines from various sources. -2. **Outgoing Port** (``port_out``): Used to send stored loglines to connected components. - -This setup facilitates the asynchronous handling of log data, allowing various input sources to transmit loglines and -multiple processing components to retrieve them as needed. +The :class:`LogServer` class is the core component of this stage. It reads from several input sources and sends the +data to Kafka, where it can be obtained by the following module. Main Class ---------- @@ -40,45 +27,23 @@ Main Class .. py:currentmodule:: src.logserver.server .. autoclass:: LogServer -Usage ------ +Usage and configuration +----------------------- -The :class:`LogServer` operates with two types of connections: +Currently, the :class:`LogServer` reads from both an input file and a Kafka topic, simultaneously. The configuration +allows changing the file name to read from. -- **Incoming Connections** (``port_in``): +- **Without Docker**: - - Components can connect to the incoming port to send loglines to the server. These components can include: + - To change the input file path, change ``pipeline.log_storage.logserver.input_file`` in the `config.yaml`. The + default setting is ``"/opt/file.txt"``. - - Direct APIs from systems generating log data. - - Mock logline generators for testing. - - File readers that parse log files and send individual log entries to the server. - - - The server is designed to handle multiple concurrent sources, allowing diverse and simultaneous input streams. - -- **Outgoing Connections** (``port_out``): - - - Components can connect to the outgoing port to retrieve the next available logline from the server. - - If no loglines are available at the time of the connection, the server will return ``None``. - -This dual-port architecture allows for flexibility and scalability in log data handling. - -Configuration -------------- +- **With Docker**: -Configuration settings for the :class:`LogServer` are managed in the `config.yaml` file (key: ``heidgaf.logserver``). -The default settings include: - -- **Port Configuration**: - - - ``port_in``: Default is ``9998``. - - ``port_out``: Default is ``9999``. - -- **Connection Limits**: - - - ``max_number_of_connections``: The maximum number of simultaneous connections allowed. Default is set to ``1000``. - -These settings can be adjusted to meet specific deployment requirements, allowing customization of network ports and -connection handling. + - Docker mounts the file specified in ``MOUNT_PATH`` in the file `docker/.env`. By default, this is set to + ``../../default.txt``, which refers to the file `docker/default.txt`. + - By changing this variable, the file to be mounted can be set. Please note that in this case, the variable specified + in the `config.yaml` must be set to the default value. Stage 2: Log Collection @@ -150,7 +115,7 @@ validates. The logline is parsed into its respective fields, each checked for co - **Log Line Format**: - - Log lines have the format: + - By default, log lines have the following format: .. code-block:: @@ -194,6 +159,8 @@ validates. The logline is parsed into its respective fields, each checked for co | | bytes. | +----------------------+------------------------------------------------+ + - Users can change the format and field types, as described in the :ref:`Logline format configuration` section. + BufferedBatch ............. @@ -224,31 +191,15 @@ The :class:`CollectorKafkaBatchSender` manages the sending of validated loglines Configuration ------------- -Configuration settings for the :class:`LogCollector` and :class:`CollectorKafkaBatchSender` are managed in the -`config.yaml` file (keys: ``heidgaf.collector``, ``kafka.batch_sender`` and ``heidgaf.subnet``): +The :class:`LogCollector` checks the validity of incoming loglines. For this, it uses the ``logline_format`` configured +in the ``config.yaml``. - **LogCollector Analyzation Criteria**: - - ``valid_status_codes``: The accepted status codes for logline validation. Default list contains ``NOERROR`` - and ``NXDOMAIN``. - - ``valid_record_types``: The accepted DNS record types for logline validation. Default list contains ``A`` and - ``AAAA``. - -- **Batch Configuration**: - - - ``batch_size``: The maximum number of loglines per batch. Default is ``1000``. - - ``timeout_seconds``: The time interval (in seconds) after which the batch is sent, regardless of size. Default - is ``60``. - -- **Number of bits used in Subnet ID**: - - - ``subnet_bits``: The number of bits, after which to cut off the client's IP address to use as ``subnet_id``. Default - is ``24``. - -- **Kafka Topics**: - - - **Output Topic**: ``Prefilter`` - After collection, the processed log data is published to this topic for subsequent - stages. + - Valid status codes: The accepted status codes for logline validation. This is defined in the field with name + ``"status_code"`` in the ``logline_format`` list. + - Valid record types: The accepted DNS record types for logline validation. This is defined in the field with name + ``"record_type"`` in the ``logline_format`` list. Buffer Functionality -------------------- @@ -351,20 +302,13 @@ for further processing in subsequent stages. Configuration ------------- -To configure the :class:`Prefilter` and customize the filtering behavior, the following options are available: - -- **Error Types**: - - - When creating an instance of :class:`Prefilter`, a list of error types is passed as an argument. This list defines - the types of errors that should be retained in the filtering process. - - **Example**: If the filter is configured with the list ``["NXDOMAIN"]``, only logs with error status - ``NXDOMAIN`` will be processed and sent to the ``Inspect`` topic. +To customize the filtering behavior, the following options in the ``logline_format`` set +in the ``config.yaml`` are used. -- **Kafka Topics**: +- **Relevant Types**: - - **Input Topic**: ``Prefilter`` - This is the Kafka topic from which the `Prefilter` loads the incoming log data. - - **Output Topic**: ``Inspect`` - After filtering, the processed log data is published to this topic for subsequent - stages. + - If the fourth entry of the field configuration with type ``ListItem`` in the ``logline_format`` list is defined for + any field name, the values in this list are the relevant values. Stage 4: Inspection diff --git a/docs/usage.rst b/docs/usage.rst index 8f03a7d8..f84ed34a 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -8,23 +8,24 @@ Usage .. _installation: .. _configuration: + Getting Started --------------- -If you want to use heiDGAF, just use the provided ``docker-compose.yml`` to quickly bootstrap your environment: +To use heiDGAF, just use the provided ``docker-compose.yml`` to quickly bootstrap your environment: .. code-block:: console $ docker compose -f docker/docker-compose.yml up -Run container individually: - +If you want to run containers individually, use: .. code-block:: console $ docker compose -f docker/docker-compose.kafka.yml up $ docker run ... + Installation ------------ @@ -40,19 +41,16 @@ Install all Python requirements. .. code-block:: console - (.venv) $ pip install -r requirements/requirements.dev.txt -r requirements/requirements.detector.txt -r requirements/requirements.logcollector.txt -r requirements/requirements.prefilter.txt -r requirements/requirements.inspector.txt -r requirements/requirements.logserver.txt + (.venv) $ sh install_requirements.sh -Now, you can start each stage, e.g. the inspector: +Now, you can start each module, e.g. the `Inspector`: .. code-block:: console (.venv) $ python src/inspector/main.py + Configuration ------------- -.. note:: - - This section will be updated to show a full table of all configuration values, and their default values. - .. include:: configuration.rst diff --git a/src/logcollector/collector.py b/src/logcollector/collector.py index 5776eda7..4a7d55bc 100644 --- a/src/logcollector/collector.py +++ b/src/logcollector/collector.py @@ -25,7 +25,12 @@ "ipv6_prefix_length" ] TIMESTAMP_FORMAT = config["environment"]["timestamp_format"] -REQUIRED_FIELDS = ["timestamp", "status_code", "client_ip", "record_type"] +REQUIRED_FIELDS = [ + "timestamp", + "status_code", + "client_ip", + "record_type", +] BATCH_SIZE = config["pipeline"]["log_collection"]["batch_handler"]["batch_size"] CONSUME_TOPIC = config["environment"]["kafka_topics"]["pipeline"][ "logserver_to_collector" diff --git a/src/monitoring/clickhouse_batch_sender.py b/src/monitoring/clickhouse_batch_sender.py index c8435c81..f01134df 100644 --- a/src/monitoring/clickhouse_batch_sender.py +++ b/src/monitoring/clickhouse_batch_sender.py @@ -207,8 +207,8 @@ def insert(self, table_name: str): def insert_all(self): """Inserts the batch for every table.""" - for e in self.batch: - self.insert(e) + for table in self.batch: + self.insert(table) if self.timer: self.timer.cancel() diff --git a/tests/clickhouse/test_clickhouse_batch_sender.py b/tests/clickhouse/test_clickhouse_batch_sender.py index fe19937b..b91ab614 100644 --- a/tests/clickhouse/test_clickhouse_batch_sender.py +++ b/tests/clickhouse/test_clickhouse_batch_sender.py @@ -51,231 +51,232 @@ def test_verify_with_optional_value(self): class TestInit(unittest.TestCase): - @patch("src.monitoring.clickhouse_batch_sender.BATCH_SIZE", 50) - @patch("src.monitoring.clickhouse_batch_sender.BATCH_TIMEOUT", 0.5) - @patch("src.monitoring.clickhouse_batch_sender.CLICKHOUSE_HOSTNAME", "test_name") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_init(self, mock_clickhouse_connect): + def test_successful(self): # Act - sut = ClickHouseBatchSender() + with ( + patch("src.monitoring.clickhouse_batch_sender.BATCH_SIZE", 50), + patch("src.monitoring.clickhouse_batch_sender.BATCH_TIMEOUT", 0.5), + patch( + "src.monitoring.clickhouse_batch_sender.CLICKHOUSE_HOSTNAME", + "test_name", + ), + patch( + "src.monitoring.clickhouse_batch_sender.clickhouse_connect" + ) as mock_clickhouse_connect, + ): + sut = ClickHouseBatchSender() # Assert + self.assertIsNotNone(sut.tables) self.assertEqual(50, sut.max_batch_size) self.assertEqual(0.5, sut.batch_timeout) self.assertIsNone(sut.timer) - self.assertEqual(dict, type(sut.batch)) - - mock_clickhouse_connect.get_client.assert_called_once_with(host="test_name") + self.assertIsNotNone(sut.lock) + self.assertEqual({key: [] for key in sut.tables}, sut.batch) class TestDel(unittest.TestCase): - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_del(self, mock_clickhouse_connect): - # Arrange - table_name = "test_table_name" - column_names = ["col_1", "col_2"] - sut = ClickHouseBatchSender() + def setUp(self): + with patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect"): + self.sut = ClickHouseBatchSender() + def test_del(self): # Act with patch( "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert_all" ) as mock_insert_all: - del sut + del self.sut # Assert mock_insert_all.assert_called_once() class TestAdd(unittest.TestCase): - @patch("src.monitoring.clickhouse_batch_sender.Table") - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert") - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_add_list_of_str_successful( - self, mock_clickhouse_connect, mock_start_timer, mock_insert, mock_table - ): + def setUp(self): + with patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect"): + self.sut = ClickHouseBatchSender() + + def test_single_list_with_starting_timer(self): # Arrange - table_name = "fill_levels" - sut = ClickHouseBatchSender() - - now = datetime.datetime.now() - data = { - "timestamp": now, - "stage": "test_stage", - "entry_type": "test_type", - "entry_count": 23, - } + test_table_name = "test_table" + test_data = {"value_1": 1, "value_2": 2} + + self.sut.tables = {test_table_name: Table(test_table_name, {})} + self.sut.batch = {test_table_name: []} # Act - sut.add(table_name, data) + with ( + patch("src.monitoring.clickhouse_batch_sender.Table.verify"), + patch( + "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer" + ) as mock_start_timer, + ): + self.sut.add(test_table_name, test_data) # Assert - self.assertEqual( - [[now, "test_stage", "test_type", 23]], sut.batch.get(table_name) - ) - - mock_insert.assert_not_called() + self.sut.batch = {test_table_name: [1, 2]} mock_start_timer.assert_called_once() - @patch("src.monitoring.clickhouse_batch_sender.Table") - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert") - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_add_timer_already_started( - self, mock_clickhouse_connect, mock_start_timer, mock_insert, mock_table - ): + def test_timer_already_started(self): # Arrange - table_name = "fill_levels" - sut = ClickHouseBatchSender() - - now = datetime.datetime.now() - data = { - "timestamp": now, - "stage": "test_stage", - "entry_type": "test_type", - "entry_count": 23, - } - sut.timer = Mock() + test_table_name = "test_table" + test_data = {"value_1": 1, "value_2": 2} + + self.sut.tables = {test_table_name: Table(test_table_name, {})} + self.sut.batch = {test_table_name: []} + self.sut.timer = Mock() # Act - sut.add(table_name, data) + with ( + patch("src.monitoring.clickhouse_batch_sender.Table.verify"), + patch( + "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer" + ) as mock_start_timer, + ): + self.sut.add(test_table_name, test_data) # Assert - self.assertEqual( - [[now, "test_stage", "test_type", 23]], sut.batch.get(table_name) - ) - - mock_insert.assert_not_called() mock_start_timer.assert_not_called() - @patch("src.monitoring.clickhouse_batch_sender.Table") - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert") - @patch("src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_add_max_size_reached_and_timer_already_started( - self, mock_clickhouse_connect, mock_start_timer, mock_insert, mock_table - ): + def test_max_batch_size_reached(self): # Arrange - table_name = "fill_levels" - sut = ClickHouseBatchSender() - - now = datetime.datetime.now() - data = { - "timestamp": now, - "stage": "test_stage", - "entry_type": "test_type", - "entry_count": 23, - } - sut.timer = Mock() - sut.max_batch_size = 1 + test_table_name = "test_table" + test_data = {"value_1": 1, "value_2": 2} + + self.sut.tables = {test_table_name: Table(test_table_name, {})} + self.sut.batch = {test_table_name: []} + self.sut.max_batch_size = 1 # Act - sut.add(table_name, data) + with ( + patch("src.monitoring.clickhouse_batch_sender.Table.verify"), + patch( + "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert" + ) as mock_insert, + patch( + "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender._start_timer" + ), + ): + self.sut.add(test_table_name, test_data) # Assert - self.assertEqual( - [[now, "test_stage", "test_type", 23]], sut.batch.get(table_name) - ) + mock_insert.assert_called_once_with(test_table_name) - mock_insert.assert_called_once() - mock_start_timer.assert_not_called() +class TestInsert(unittest.TestCase): + def setUp(self): + with patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect"): + self.sut = ClickHouseBatchSender() -class TestInsertAll(unittest.TestCase): - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_insert_all(self, mock_clickhouse_connect): + def test_filled_batch(self): # Arrange - table_name = "fill_levels" - sut = ClickHouseBatchSender() - sut._client = Mock() - first = datetime.datetime.now() - second = datetime.datetime.now() - sut.batch[table_name] = [ - [first, "test_stage", "test_type", 23], - [second, "test_stage", "test_type", 24], - ] + test_table_name = "test_table" + + self.sut.tables = { + test_table_name: Table(test_table_name, {"col_1": str, "col_2": str}) + } + self.sut.batch = {test_table_name: ["one", "two", "three"]} + self.sut._client = Mock() # Act - sut.insert_all() + self.sut.insert(test_table_name) # Assert - self.assertEqual([], sut.batch.get(table_name)) - self.assertIsNone(sut.timer) - - sut._client.insert.assert_called_once_with( - table_name, - [ - [first, "test_stage", "test_type", 23], - [second, "test_stage", "test_type", 24], - ], - column_names=["timestamp", "stage", "entry_type", "entry_count"], + self.sut._client.insert.assert_called_once_with( + test_table_name, + ["one", "two", "three"], + column_names=["col_1", "col_2"], ) + self.assertEquals([], self.sut.batch[test_table_name]) - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_insert_all_with_timer(self, mock_clickhouse_connect): + def test_empty_batch(self): # Arrange - table_name = "fill_levels" - sut = ClickHouseBatchSender() - sut._client = Mock() - sut.timer = Mock() - first = datetime.datetime.now() - second = datetime.datetime.now() - sut.batch[table_name] = [ - [first, "test_stage", "test_type", 23], - [second, "test_stage", "test_type", 24], - ] + test_table_name = "test_table" + + self.sut.tables = { + test_table_name: Table(test_table_name, {"col_1": str, "col_2": str}) + } + self.sut.batch = {test_table_name: []} + self.sut._client = Mock() # Act - sut.insert_all() + self.sut.insert(test_table_name) # Assert - self.assertEqual([], sut.batch.get(table_name)) - self.assertIsNone(sut.timer) + self.sut._client.insert.assert_not_called() + self.assertEquals([], self.sut.batch[test_table_name]) - sut._client.insert.assert_called_once_with( - table_name, - [ - [first, "test_stage", "test_type", 23], - [second, "test_stage", "test_type", 24], - ], - column_names=["timestamp", "stage", "entry_type", "entry_count"], - ) + +class TestInsertAll(unittest.TestCase): + def setUp(self): + with patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect"): + self.sut = ClickHouseBatchSender() + + def test_successful(self): + # Arrange + test_table_name_1 = "test_table_1" + test_table_name_2 = "test_table_2" + + self.sut.tables = { + test_table_name_1: Table(test_table_name_1, {}), + test_table_name_2: Table(test_table_name_2, {}), + } + self.sut.batch = {test_table_name_1: [1, 2, 3], test_table_name_2: [4, 5]} + self.sut.timer = Mock() + + # Act + with patch( + "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert" + ) as mock_insert: + self.sut.insert_all() + + # Assert + + mock_insert.assert_any_call(test_table_name_1) + mock_insert.assert_any_call(test_table_name_2) class TestStartTimer(unittest.TestCase): - @patch("src.monitoring.clickhouse_batch_sender.BATCH_TIMEOUT", 0.5) - @patch("src.monitoring.clickhouse_batch_sender.Timer") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_start_timer(self, mock_clickhouse_connect, mock_timer): + def setUp(self): + with patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect"): + self.sut = ClickHouseBatchSender() + + def test_without_existing_timer(self): # Arrange - sut = ClickHouseBatchSender() + self.sut.timer = None # Act - sut._start_timer() + with ( + patch("src.monitoring.clickhouse_batch_sender.Timer") as mock_timer, + patch("src.monitoring.clickhouse_batch_sender.BATCH_TIMEOUT", 5), + patch( + "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert_all" + ) as mock_insert_all, + ): + self.sut._start_timer() # Assert - mock_timer.assert_called_once_with( - 0.5, - sut.insert_all, - ) - mock_timer.cancel.assert_not_called() - sut.timer.start.assert_called_once() + mock_timer.assert_called_once_with(5, mock_insert_all) + # noinspection PyUnresolvedReferences + self.sut.timer.start.assert_called_once() - @patch("src.monitoring.clickhouse_batch_sender.BATCH_TIMEOUT", 0.5) - @patch("src.monitoring.clickhouse_batch_sender.Timer") - @patch("src.monitoring.clickhouse_batch_sender.clickhouse_connect") - def test_start_timer_with_running_timer(self, mock_clickhouse_connect, mock_timer): + def test_with_existing_timer(self): # Arrange - sut = ClickHouseBatchSender() - sut.timer = mock_timer + self.sut.timer = Mock() # Act - sut._start_timer() + with ( + patch("src.monitoring.clickhouse_batch_sender.Timer") as mock_timer, + patch("src.monitoring.clickhouse_batch_sender.BATCH_TIMEOUT", 5), + patch( + "src.monitoring.clickhouse_batch_sender.ClickHouseBatchSender.insert_all" + ) as mock_insert_all, + patch.object(self.sut.timer, "cancel") as mock_cancel, + ): + self.sut._start_timer() # Assert - mock_timer.assert_called_once_with( - 0.5, - sut.insert_all, - ) - mock_timer.cancel.assert_called_once() - sut.timer.start.assert_called_once() + mock_cancel.assert_called_once() + mock_timer.assert_called_once_with(5, mock_insert_all) + # noinspection PyUnresolvedReferences + self.sut.timer.start.assert_called_once() diff --git a/tests/miscellaneous/test_monitoring_agent.py b/tests/miscellaneous/test_monitoring_agent.py index 6994d644..6943c24c 100644 --- a/tests/miscellaneous/test_monitoring_agent.py +++ b/tests/miscellaneous/test_monitoring_agent.py @@ -49,54 +49,43 @@ def test_prepare_all_tables_with_exception( class TestInit(unittest.TestCase): - @patch("src.monitoring.monitoring_agent.ClickHouseBatchSender") - @patch("src.monitoring.monitoring_agent.SimpleKafkaConsumeHandler") - def test_init(self, mock_kafka_consumer, mock_clickhouse): - # Arrange - expected_topics = [ - "clickhouse_server_logs", - "clickhouse_server_logs_timestamps", - "clickhouse_failed_dns_loglines", - "clickhouse_logline_to_batches", - "clickhouse_dns_loglines", - "clickhouse_logline_timestamps", - "clickhouse_batch_timestamps", - "clickhouse_suspicious_batches_to_batch", - "clickhouse_suspicious_batch_timestamps", - "clickhouse_alerts", - "clickhouse_fill_levels", - ] - + def test_successful(self): # Act - sut = MonitoringAgent() + with ( + patch( + "src.monitoring.monitoring_agent.SimpleKafkaConsumeHandler" + ) as mock_simple_kafka_consume_handler, + patch( + "src.monitoring.monitoring_agent.ClickHouseBatchSender" + ) as mock_clickhouse_batch_sender, + ): + sut = MonitoringAgent() # Assert - self.assertEqual( - expected_topics, - sut.topics, + self.assertTrue( + isinstance(sut.table_names, list) + and all(isinstance(e, str) for e in sut.table_names) ) - mock_kafka_consumer.assert_called_once_with(expected_topics) + self.assertTrue(all(e.startswith("clickhouse_") for e in sut.topics)) + self.assertIsNotNone(sut.kafka_consumer) + self.assertIsNotNone(sut.batch_sender) + mock_simple_kafka_consume_handler.assert_called_once_with(sut.topics) + mock_clickhouse_batch_sender.assert_called_once() class TestStart(unittest.IsolatedAsyncioTestCase): - @patch("src.monitoring.monitoring_agent.ClickHouseBatchSender") - @patch("src.monitoring.monitoring_agent.logger") - @patch("src.monitoring.monitoring_agent.SimpleKafkaConsumeHandler") - @patch("asyncio.get_running_loop") - async def test_handle_kafka_inputs( - self, - mock_get_running_loop, - mock_kafka_consume, - mock_logger, - mock_batch_sender, - ): + def setUp(self): + with ( + patch("src.monitoring.monitoring_agent.SimpleKafkaConsumeHandler"), + patch("src.monitoring.monitoring_agent.ClickHouseBatchSender"), + ): + self.sut = MonitoringAgent() + + async def test_successful(self): # Arrange - sut = MonitoringAgent() - sut.batch_sender = Mock() - data_schema = marshmallow_dataclass.class_schema(ServerLogs)() - fixed_id = uuid.uuid4() - timestamp_in = datetime.datetime.now() + fixed_id = uuid.UUID("35871c8c-ff72-44ad-a9b7-4f02cf92d484") + timestamp_in = datetime.datetime(2025, 4, 3, 12, 32, 7, 264410) value = data_schema.dumps( { "message_id": fixed_id, @@ -104,27 +93,34 @@ async def test_handle_kafka_inputs( "message_text": "test_text", } ) - - mock_loop = AsyncMock() - mock_get_running_loop.return_value = mock_loop - sut.kafka_consumer.consume.return_value = ( - "key1", + self.sut.kafka_consumer.consume.return_value = ( + "test_key", value, "clickhouse_server_logs", ) - mock_loop.run_in_executor.side_effect = [ - ("key1", value, "clickhouse_server_logs"), - KeyboardInterrupt(), - ] - # Act and Assert - await sut.start() + with patch( + "src.monitoring.monitoring_agent.asyncio.get_running_loop" + ) as mock_get_running_loop: + mock_loop = AsyncMock() + mock_get_running_loop.return_value = mock_loop + + mock_loop.run_in_executor.side_effect = [ + ("test_key", value, "clickhouse_server_logs"), + KeyboardInterrupt(), + ] - sut.batch_sender.add.assert_called_once_with( + # Act + await self.sut.start() + + # Assert + self.sut.batch_sender.add.assert_called_once_with( "server_logs", - dict( - message_id=fixed_id, timestamp_in=timestamp_in, message_text="test_text" - ), + { + "message_id": fixed_id, + "timestamp_in": timestamp_in, + "message_text": "test_text", + }, )