Skip to content

Created unittest for metadatamanager.py #954

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ jobs:
- test_symbols_handler.py
- test_profile_handler.py
- test_process_manager.py
- test_metadata_manager.py
- test_host_ip_manager.py

steps:
- uses: actions/checkout@v4
Expand Down
3 changes: 2 additions & 1 deletion managers/host_ip_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def get_host_ip(self) -> Optional[str]:
def store_host_ip(self) -> Optional[str]:
"""
stores the host ip in the db
Retries to get the host IP online every 10s if not connected
recursively retries to get the host IP online every 10s if not
connected
"""
if not self.main.db.is_running_non_stop():
return
Expand Down
3 changes: 2 additions & 1 deletion managers/metadata_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def set_analysis_end_date(self, end_date):
"""
if not self.main.conf.enable_metadata():
return
end_date = utils.convert_format(datetime.now(), utils.alerts_format)

end_date = utils.convert_format(end_date, utils.alerts_format)
self.main.db.set_input_metadata({"analysis_end": end_date})

# add slips end date in the metadata dir
Expand Down
48 changes: 44 additions & 4 deletions tests/module_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
mock_open,
)
import os
from multiprocessing import Queue

from managers.host_ip_manager import HostIPManager
from managers.metadata_manager import MetadataManager
from modules.flowalerts.conn import Conn
from modules.threat_intelligence.circl_lu import Circllu
from modules.threat_intelligence.spamhaus import Spamhaus
Expand All @@ -17,7 +19,6 @@
Channels,
)
from slips_files.core.evidencehandler import EvidenceHandler

from slips_files.core.helpers.notify import Notify
from modules.flowalerts.dns import DNS
from modules.flowalerts.downloaded_file import DownloadedFile
Expand Down Expand Up @@ -50,7 +51,6 @@
from managers.process_manager import ProcessManager
from managers.redis_manager import RedisManager
from modules.ip_info.asn_info import ASN
from multiprocessing import Queue
from slips_files.core.helpers.flow_handler import FlowHandler
from modules.network_discovery.horizontal_portscan import HorizontalPortscan
from modules.network_discovery.network_discovery import NetworkDiscovery
Expand Down Expand Up @@ -339,10 +339,18 @@ def create_profiler_obj(self, mock_db):
profiler.db = mock_db
return profiler

def create_redis_manager_obj(self, main):
@patch(MODULE_DB_MANAGER, name="mock_db")
def create_redis_manager_obj(self, mock_db):
main = self.create_main_obj()
main.db = mock_db
main.args = Mock()
return RedisManager(main)

def create_host_ip_manager_obj(self, main):
@patch(MODULE_DB_MANAGER, name="mock_db")
def create_host_ip_manager_obj(self, mock_db):
main = self.create_main_obj()
main.db = mock_db
main.print = Mock()
return HostIPManager(main)

def create_utils_obj(self):
Expand Down Expand Up @@ -650,3 +658,35 @@ def create_process_manager_obj(self):
main_mock.stdout = ""
main_mock.args = Mock(growing=False, input_module=False, testing=False)
return ProcessManager(main_mock)

@patch(MODULE_DB_MANAGER, name="mock_db")
def create_metadata_manager_obj(self, mock_db):
main = self.create_main_obj()
metadata_manager = MetadataManager(main)

mock_attributes = {
"db": mock_db,
"print": MagicMock(),
"args": MagicMock(
output="/tmp/output",
config="config/slips.yaml",
filepath=MagicMock(),
),
"conf": MagicMock(
enable_metadata=MagicMock(return_value=True),
whitelist_path=MagicMock(
return_value="/path/to/whitelist.conf"
),
get_disabled_modules=MagicMock(return_value=[]),
evidence_detection_threshold=MagicMock(return_value=0.5),
),
"version": "1.0",
"input_information": "test_input",
"input_type": MagicMock(),
"zeek_dir": MagicMock(),
}

for attr, value in mock_attributes.items():
setattr(metadata_manager.main, attr, value)

return metadata_manager
97 changes: 97 additions & 0 deletions tests/test_host_ip_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import socket
from unittest.mock import MagicMock, patch, Mock
import pytest
from tests.module_factory import ModuleFactory
import sys


@pytest.mark.parametrize(
"is_interface, host_ip, modified_profiles, "
"expected_calls, expected_result",
[ # Testcase1: Should update host IP
(True, "192.168.1.1", set(), 1, "192.168.1.2"),
# Testcase2: Shouldn't update host IP
(True, "192.168.1.1", {"192.168.1.1"}, 0, "192.168.1.1"),
# Testcase3: Shouldn't update host IP (not interface)
(False, "192.168.1.1", set(), 0, None),
],
)
def test_update_host_ip(
is_interface,
host_ip,
modified_profiles,
expected_calls,
expected_result,
):
host_ip_man = ModuleFactory().create_host_ip_manager_obj()
host_ip_man.main.db.is_running_non_stop.return_value = is_interface

host_ip_man.get_host_ip = Mock()
host_ip_man.get_host_ip.return_value = "192.168.1.2"
host_ip_man.main.db.set_host_ip = MagicMock()
result = host_ip_man.update_host_ip(host_ip, modified_profiles)
assert result == expected_result
assert host_ip_man.get_host_ip.call_count == expected_calls


def test_get_host_ip_success():
host_ip_man = ModuleFactory().create_host_ip_manager_obj()
expected_ip = "192.168.1.100"

with patch("socket.socket") as mock_socket:
mock_instance = MagicMock()
mock_socket.return_value = mock_instance

mock_instance.getsockname.return_value = (expected_ip, 80)

result = host_ip_man.get_host_ip()

assert result == expected_ip
mock_instance.connect.assert_any_call(("1.1.1.1", 80))
mock_instance.getsockname.assert_called_once()


def test_get_host_ip_failure():
host_ip_man = ModuleFactory().create_host_ip_manager_obj()

with patch("socket.socket") as mock_socket:
mock_instance = MagicMock()
mock_socket.return_value = mock_instance

mock_instance.connect.side_effect = socket.error()

result = host_ip_man.get_host_ip()

assert result is None
mock_instance.connect.assert_any_call(("1.1.1.1", 80))
mock_instance.connect.assert_any_call(("2606:4700:4700::1111", 80))
mock_instance.getsockname.assert_not_called()


@pytest.mark.parametrize(
"running_on_interface, host_ip,"
"set_host_ip_side_effect, expected_result",
[
# testcase1: Running on interface, valid IP
(True, "192.168.1.100", None, "192.168.1.100"),
# testcase2: Not running on interface
(False, "192.168.1.100", None, None),
],
)
def test_store_host_ip(
running_on_interface,
host_ip,
set_host_ip_side_effect,
expected_result,
):
host_ip_man = ModuleFactory().create_host_ip_manager_obj()
host_ip_man.main.db.is_running_non_stop.return_value = running_on_interface
host_ip_man.get_host_ip = MagicMock(return_value=host_ip)
host_ip_man.main.db.set_host_ip = MagicMock(
side_effect=set_host_ip_side_effect
)

with patch.object(sys, "argv", ["-i"] if running_on_interface else []):
with patch("time.sleep"):
result = host_ip_man.store_host_ip()
assert result == expected_result
31 changes: 0 additions & 31 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,37 +141,6 @@ def test_update_stats(mode, time_diff, expected_calls):
assert mock_print.call_count == expected_calls


@pytest.mark.parametrize(
"is_interface, host_ip, modified_profiles, "
"expected_calls, expected_result",
[ # Testcase1: Should update host IP
(True, "192.168.1.1", set(), 1, "192.168.1.2"),
# Testcase2: Shouldn't update host IP
(True, "192.168.1.1", {"192.168.1.1"}, 0, "192.168.1.1"),
# Testcase3: Shouldn't update host IP (not interface)
(False, "192.168.1.1", set(), 0, None),
],
)
def test_update_host_ip(
is_interface,
host_ip,
modified_profiles,
expected_calls,
expected_result,
):
main = ModuleFactory().create_main_obj()
main.db = Mock()
host_ip_man = ModuleFactory().create_host_ip_manager_obj(main)
host_ip_man.main.db.is_running_non_stop.return_value = is_interface

host_ip_man.get_host_ip = Mock()
host_ip_man.get_host_ip.return_value = "192.168.1.2"
host_ip_man.main.db.set_host_ip = MagicMock()
result = host_ip_man.update_host_ip(host_ip, modified_profiles)
assert result == expected_result
assert host_ip_man.get_host_ip.call_count == expected_calls


@pytest.mark.parametrize(
"args_verbose, conf_verbose, args_debug, conf_debug, "
"expected_verbose, expected_debug",
Expand Down
Loading
Loading