From 92d9c0e733b8afaa946d87257a8dc2d75294fd82 Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Fri, 9 May 2025 11:51:29 +0200 Subject: [PATCH 1/2] feat: added span filtering for kafka-python module Signed-off-by: Cagri Yonca --- pyproject.toml | 11 +- src/instana/agent/host.py | 42 +- src/instana/options.py | 28 +- src/instana/util/config.py | 98 ++++- src/instana/util/config_reader.py | 22 + src/instana/util/span_utils.py | 16 +- tests/agent/test_host.py | 30 +- tests/clients/boto3/test_boto3_dynamodb.py | 2 +- tests/clients/kafka/test_kafka_python.py | 126 +++++- tests/requirements-kafka.txt | 2 +- tests/test_options.py | 457 ++++++++++++--------- tests/util/test_config.py | 102 +++-- tests/util/test_config_reader.py | 63 +++ tests/util/test_configuration-1.yaml | 19 + tests/util/test_configuration-2.yaml | 19 + tests/util/test_span_utils.py | 21 +- 16 files changed, 731 insertions(+), 327 deletions(-) create mode 100644 src/instana/util/config_reader.py create mode 100644 tests/util/test_config_reader.py create mode 100644 tests/util/test_configuration-1.yaml create mode 100644 tests/util/test_configuration-2.yaml diff --git a/pyproject.toml b/pyproject.toml index 1f12cb6e..31c184b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ dependencies = [ "opentelemetry-api>=1.27.0", "opentelemetry-semantic-conventions>=0.48b0", "typing_extensions>=4.12.2", + "pyyaml>=6.0.2", ] [project.entry-points."instana"] @@ -59,11 +60,11 @@ string = "instana:load" [project.optional-dependencies] dev = [ - "pytest", - "pytest-cov", - "pytest-mock", - "pre-commit>=3.0.0", - "ruff" + "pytest", + "pytest-cov", + "pytest-mock", + "pre-commit>=3.0.0", + "ruff", ] [project.urls] diff --git a/src/instana/agent/host.py b/src/instana/agent/host.py index 8e03833b..ee0e1d79 100644 --- a/src/instana/agent/host.py +++ b/src/instana/agent/host.py @@ -22,7 +22,7 @@ from instana.options import StandardOptions from instana.util import to_json from instana.util.runtime import get_py_source -from instana.util.span_utils import get_operation_specifier +from instana.util.span_utils import get_operation_specifiers from instana.version import VERSION @@ -351,13 +351,18 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]: Filters given span list using ignore-endpoint variable and returns the list of filtered spans. """ filtered_spans = [] + endpoint = "" for span in spans: if (hasattr(span, "n") or hasattr(span, "name")) and hasattr(span, "data"): service = span.n - operation_specifier = get_operation_specifier(service) - endpoint = span.data[service][operation_specifier] - if isinstance(endpoint, str) and self.__is_service_or_endpoint_ignored( - service, endpoint + operation_specifier_key, service_specifier_key = ( + get_operation_specifiers(service) + ) + if service == "kafka": + endpoint = span.data[service][service_specifier_key] + method = span.data[service][operation_specifier_key] + if isinstance(method, str) and self.__is_endpoint_ignored( + service, method, endpoint ): continue else: @@ -366,15 +371,28 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]: filtered_spans.append(span) return filtered_spans - def __is_service_or_endpoint_ignored( - self, service: str, endpoint: str = "" + def __is_endpoint_ignored( + self, + service: str, + method: str = "", + endpoint: str = "", ) -> bool: """Check if the given service and endpoint combination should be ignored.""" - - return ( - service.lower() in self.options.ignore_endpoints - or f"{service.lower()}.{endpoint.lower()}" in self.options.ignore_endpoints - ) + service = service.lower() + method = method.lower() + endpoint = endpoint.lower() + filter_rules = [ + f"{service}.{method}", # service.method + f"{service}.*", # service.* + ] + + if service == "kafka" and endpoint: + filter_rules += [ + f"{service}.{method}.{endpoint}", # service.method.endpoint + f"{service}.*.{endpoint}", # service.*.endpoint + f"{service}.{method}.*", # service.method.* + ] + return any(rule in self.options.ignore_endpoints for rule in filter_rules) def handle_agent_tasks(self, task: Dict[str, Any]) -> None: """ diff --git a/src/instana/options.py b/src/instana/options.py index dee797d7..b055fb09 100644 --- a/src/instana/options.py +++ b/src/instana/options.py @@ -19,7 +19,10 @@ from typing import Any, Dict from instana.log import logger -from instana.util.config import parse_ignored_endpoints +from instana.util.config import ( + parse_ignored_endpoints, + parse_ignored_endpoints_from_yaml, +) from instana.util.runtime import determine_service_name from instana.configurator import config @@ -44,18 +47,23 @@ def __init__(self, **kwds: Dict[str, Any]) -> None: str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";") ) - if "INSTANA_IGNORE_ENDPOINTS" in os.environ: - self.ignore_endpoints = parse_ignored_endpoints( - os.environ["INSTANA_IGNORE_ENDPOINTS"] + if "INSTANA_IGNORE_ENDPOINTS_PATH" in os.environ: + self.ignore_endpoints = parse_ignored_endpoints_from_yaml( + os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"] ) else: - if ( - isinstance(config.get("tracing"), dict) - and "ignore_endpoints" in config["tracing"] - ): + if "INSTANA_IGNORE_ENDPOINTS" in os.environ: self.ignore_endpoints = parse_ignored_endpoints( - config["tracing"]["ignore_endpoints"], + os.environ["INSTANA_IGNORE_ENDPOINTS"] ) + else: + if ( + isinstance(config.get("tracing"), dict) + and "ignore_endpoints" in config["tracing"] + ): + self.ignore_endpoints = parse_ignored_endpoints( + config["tracing"]["ignore_endpoints"], + ) if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == "1": self.allow_exit_as_root = True @@ -141,7 +149,7 @@ def set_from(self, res_data: Dict[str, Any]) -> None: """ if not res_data or not isinstance(res_data, dict): logger.debug(f"options.set_from: Wrong data type - {type(res_data)}") - return + return if "secrets" in res_data: self.set_secrets(res_data["secrets"]) diff --git a/src/instana/util/config.py b/src/instana/util/config.py index c8f6d1f9..b53f8177 100644 --- a/src/instana/util/config.py +++ b/src/instana/util/config.py @@ -1,5 +1,11 @@ +# (c) Copyright IBM Corp. 2025 + +import itertools +import os from typing import Any, Dict, List, Union + from instana.log import logger +from instana.util.config_reader import ConfigReader def parse_service_pair(pair: str) -> List[str]: @@ -7,29 +13,29 @@ def parse_service_pair(pair: str) -> List[str]: Parses a pair string to prepare a list of ignored endpoints. @param pair: String format: - - "service1:endpoint1,endpoint2" or "service1:endpoint1" or "service1" - @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + - "service1:method1,method2" or "service1:method1" or "service1" + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] """ pair_list = [] if ":" in pair: - service, endpoints = pair.split(":", 1) + service, methods = pair.split(":", 1) service = service.strip() - endpoint_list = [ep.strip() for ep in endpoints.split(",") if ep.strip()] + method_list = [ep.strip() for ep in methods.split(",") if ep.strip()] - for endpoint in endpoint_list: - pair_list.append(f"{service}.{endpoint}") + for method in method_list: + pair_list.append(f"{service}.{method}") else: - pair_list.append(pair) + pair_list.append(f"{pair}.*") return pair_list -def parse_ignored_endpoints_string(params: str) -> List[str]: +def parse_ignored_endpoints_string(params: Union[str, os.PathLike]) -> List[str]: """ Parses a string to prepare a list of ignored endpoints. @param params: String format: - - "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2" - @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + - "service1:method1,method2;service2:method3" or "service1;service2" + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] """ ignore_endpoints = [] if params: @@ -46,18 +52,45 @@ def parse_ignored_endpoints_dict(params: Dict[str, Any]) -> List[str]: Parses a dictionary to prepare a list of ignored endpoints. @param params: Dict format: - - {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]} - @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + - {"service1": ["method1", "method2"], "service2": ["method3"]} + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] """ ignore_endpoints = [] - for service, endpoints in params.items(): - if not endpoints: # filtering all service - ignore_endpoints.append(service.lower()) + for service, methods in params.items(): + if not methods: # filtering all service + ignore_endpoints.append(f"{service.lower()}.*") else: # filtering specific endpoints - for endpoint in endpoints: - ignore_endpoints.append(f"{service.lower()}.{endpoint.lower()}") + ignore_endpoints = parse_endpoints_of_service( + ignore_endpoints, service, methods + ) + + return ignore_endpoints + +def parse_endpoints_of_service( + ignore_endpoints: List[str], + service: str, + methods: Union[str, List[str]], +) -> List[str]: + """ + Parses endpoints of each service. + + @param ignore_endpoints: A list of rules for endpoints to be filtered. + @param service: The name of the service to be filtered. + @param methods: A list of specific endpoints of the service to be filtered. + """ + if service == "kafka" and isinstance(methods, list): + for rule in methods: + for method, endpoint in itertools.product( + rule["methods"], rule["endpoints"] + ): + ignore_endpoints.append( + f"{service.lower()}.{method.lower()}.{endpoint.lower()}" + ) + else: + for method in methods: + ignore_endpoints.append(f"{service.lower()}.{method.lower()}") return ignore_endpoints @@ -66,9 +99,9 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]: Parses input to prepare a list for ignored endpoints. @param params: Can be either: - - String: "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2" - - Dict: {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]} - @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + - String: "service1:method1,method2;service2:method3" or "service1;service2" + - Dict: {"service1": ["method1", "method2"], "service2": ["method3"]} + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] """ try: if isinstance(params, str): @@ -80,3 +113,28 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]: except Exception as e: logger.debug("Error parsing ignored endpoints: %s", str(e)) return [] + + +def parse_ignored_endpoints_from_yaml(file_path: str) -> List[str]: + """ + Parses configuration yaml file and prepares a list of ignored endpoints. + + @param file_path: Path of the file as a string + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*", "kafka.method.topic", "kafka.*.topic", "kafka.method.*"] + """ + config_reader = ConfigReader(file_path) + ignore_endpoints_dict = None + if "tracing" in config_reader.data: + ignore_endpoints_dict = config_reader.data["tracing"].get("ignore-endpoints") + elif "com.instana.tracing" in config_reader.data: + logger.warning( + 'Please use "tracing" instead of "com.instana.tracing" for local configuration file.' + ) + ignore_endpoints_dict = config_reader.data["com.instana.tracing"].get( + "ignore-endpoints" + ) + if ignore_endpoints_dict: + ignored_endpoints = parse_ignored_endpoints(ignore_endpoints_dict) + return ignored_endpoints + else: + return [] diff --git a/src/instana/util/config_reader.py b/src/instana/util/config_reader.py new file mode 100644 index 00000000..ddec31ec --- /dev/null +++ b/src/instana/util/config_reader.py @@ -0,0 +1,22 @@ +# (c) Copyright IBM Corp. 2025 + +from typing import Union +from instana.log import logger +import yaml + + +class ConfigReader: + def __init__(self, file_path: Union[str]) -> None: + self.file_path = file_path + self.data = None + self.load_file() + + def load_file(self) -> None: + """Loads and parses the YAML file""" + try: + with open(self.file_path, "r") as file: + self.data = yaml.safe_load(file) + except FileNotFoundError: + logger.error(f"Configuration file has not found: {self.file_path}") + except yaml.YAMLError as e: + logger.error(f"Error parsing YAML file: {e}") diff --git a/src/instana/util/span_utils.py b/src/instana/util/span_utils.py index 34049759..2dda4759 100644 --- a/src/instana/util/span_utils.py +++ b/src/instana/util/span_utils.py @@ -1,13 +1,17 @@ # (c) Copyright IBM Corp. 2025 -from typing import Optional +from typing import Tuple -def get_operation_specifier(span_name: str) -> Optional[str]: +def get_operation_specifiers(span_name: str) -> Tuple[str, str]: """Get the specific operation specifier for the given span.""" - operation_specifier = "" + operation_specifier_key = "" + service_specifier_key = "" if span_name == "redis": - operation_specifier = "command" + operation_specifier_key = "command" elif span_name == "dynamodb": - operation_specifier = "op" - return operation_specifier + operation_specifier_key = "op" + elif span_name == "kafka": + operation_specifier_key = "access" + service_specifier_key = "service" + return operation_specifier_key, service_specifier_key diff --git a/tests/agent/test_host.py b/tests/agent/test_host.py index 4ec2647d..93b89c0a 100644 --- a/tests/agent/test_host.py +++ b/tests/agent/test_host.py @@ -692,31 +692,21 @@ def test_diagnostics(self, caplog: pytest.LogCaptureFixture) -> None: assert "should_send_snapshot_data: True" in caplog.messages def test_is_service_or_endpoint_ignored(self) -> None: - self.agent.options.ignore_endpoints.append("service1") - self.agent.options.ignore_endpoints.append("service2.endpoint1") + self.agent.options.ignore_endpoints.append("service1.*") + self.agent.options.ignore_endpoints.append("service2.method1") # ignore all endpoints of service1 - assert self.agent._HostAgent__is_service_or_endpoint_ignored("service1") - assert self.agent._HostAgent__is_service_or_endpoint_ignored( - "service1", "endpoint1" - ) - assert self.agent._HostAgent__is_service_or_endpoint_ignored( - "service1", "endpoint2" - ) + assert self.agent._HostAgent__is_endpoint_ignored("service1") + assert self.agent._HostAgent__is_endpoint_ignored("service1", "method1") + assert self.agent._HostAgent__is_endpoint_ignored("service1", "method2") # case-insensitive - assert self.agent._HostAgent__is_service_or_endpoint_ignored("SERVICE1") - assert self.agent._HostAgent__is_service_or_endpoint_ignored( - "service1", "ENDPOINT1" - ) + assert self.agent._HostAgent__is_endpoint_ignored("SERVICE1") + assert self.agent._HostAgent__is_endpoint_ignored("service1", "METHOD1") # ignore only endpoint1 of service2 - assert self.agent._HostAgent__is_service_or_endpoint_ignored( - "service2", "endpoint1" - ) - assert not self.agent._HostAgent__is_service_or_endpoint_ignored( - "service2", "endpoint2" - ) + assert self.agent._HostAgent__is_endpoint_ignored("service2", "method1") + assert not self.agent._HostAgent__is_endpoint_ignored("service2", "method2") # don't ignore other services - assert not self.agent._HostAgent__is_service_or_endpoint_ignored("service3") + assert not self.agent._HostAgent__is_endpoint_ignored("service3") diff --git a/tests/clients/boto3/test_boto3_dynamodb.py b/tests/clients/boto3/test_boto3_dynamodb.py index bb427e64..55f09df6 100644 --- a/tests/clients/boto3/test_boto3_dynamodb.py +++ b/tests/clients/boto3/test_boto3_dynamodb.py @@ -94,7 +94,7 @@ def test_ignore_dynamodb(self) -> None: assert dynamodb_span not in filtered_spans def test_ignore_create_table(self) -> None: - os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb.createtable" + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb:createtable" agent.options = StandardOptions() with tracer.start_as_current_span("test"): diff --git a/tests/clients/kafka/test_kafka_python.py b/tests/clients/kafka/test_kafka_python.py index 5999ef09..6f4adea8 100644 --- a/tests/clients/kafka/test_kafka_python.py +++ b/tests/clients/kafka/test_kafka_python.py @@ -1,5 +1,6 @@ # (c) Copyright IBM Corp. 2025 +import os from typing import Generator import pytest @@ -8,7 +9,9 @@ from kafka.errors import TopicAlreadyExistsError from opentelemetry.trace import SpanKind +from instana.options import StandardOptions from instana.singletons import agent, tracer +from instana.util.config import parse_ignored_endpoints_from_yaml from tests.helpers import get_first_span_by_filter, testenv @@ -57,7 +60,7 @@ def test_trace_kafka_python_send(self) -> None: with tracer.start_as_current_span("test"): future = self.producer.send(testenv["kafka_topic"], b"raw_bytes") - record_metadata = future.get(timeout=10) # noqa: F841 + _ = future.get(timeout=10) # noqa: F841 spans = self.recorder.queued_spans() assert len(spans) == 2 @@ -203,6 +206,127 @@ def test_trace_kafka_python_error(self) -> None: assert kafka_span.data["kafka"]["access"] == "consume" assert kafka_span.data["kafka"]["error"] == "StopIteration()" + def consume_from_topic(self, topic_name: str) -> None: + consumer = KafkaConsumer( + topic_name, + bootstrap_servers=testenv["kafka_bootstrap_servers"], + auto_offset_reset="earliest", + enable_auto_commit=False, + consumer_timeout_ms=1000, + ) + with tracer.start_as_current_span("test"): + for msg in consumer: + if msg is None: + break + + consumer.close() + + def test_ignore_kafka(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka" + + agent.options = StandardOptions() + + with tracer.start_as_current_span("test"): + self.producer.send(testenv["kafka_topic"], b"raw_bytes") + self.producer.flush() + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + def test_ignore_kafka_producer(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:send" + + agent.options = StandardOptions() + + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.send(testenv["kafka_topic"], b"raw_bytes1") + self.producer.send(testenv["kafka_topic"], b"raw_bytes2") + self.producer.flush() + + # Consume the events + consumer = KafkaConsumer( + testenv["kafka_topic"], + bootstrap_servers=testenv["kafka_bootstrap_servers"], + auto_offset_reset="earliest", + enable_auto_commit=False, + consumer_timeout_ms=1000, + ) + for msg in consumer: + if msg is None: + break + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + @pytest.mark.flaky(reruns=3) + def test_ignore_kafka_consumer(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:consume" + agent.options = StandardOptions() + + # Produce some events + self.producer.send(testenv["kafka_topic"], b"raw_bytes1") + self.producer.send(testenv["kafka_topic"], b"raw_bytes2") + self.producer.flush() + + # Consume the events + self.consume_from_topic(testenv["kafka_topic"]) + + spans = self.recorder.queued_spans() + assert len(spans) == 4 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + @pytest.mark.flaky(reruns=5) + def test_ignore_specific_topic(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:consume" + os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"] = ( + "tests/util/test_configuration-1.yaml" + ) + + agent.options = StandardOptions() + + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.send(testenv["kafka_topic"], b"raw_bytes1") + self.producer.flush() + + # Consume the events + self.consume_from_topic(testenv["kafka_topic"]) + + spans = self.recorder.queued_spans() + assert len(spans) == 6 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 3 + + def test_ignore_specific_topic_with_config_file(self) -> None: + agent.options.ignore_endpoints = parse_ignored_endpoints_from_yaml( + "tests/util/test_configuration-1.yaml" + ) + + # Produce some events + self.producer.send(testenv["kafka_topic"], b"raw_bytes1") + self.producer.flush() + + # Consume the events + self.consume_from_topic(testenv["kafka_topic"]) + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + def test_kafka_consumer_root_exit(self) -> None: agent.options.allow_exit_as_root = True diff --git a/tests/requirements-kafka.txt b/tests/requirements-kafka.txt index 2451489a..640d2ad9 100644 --- a/tests/requirements-kafka.txt +++ b/tests/requirements-kafka.txt @@ -1,5 +1,5 @@ -r requirements-minimal.txt mock>=2.0.0 +confluent-kafka>=2.0.0 kafka-python>=2.0.0; python_version < "3.12" kafka-python-ng>=2.0.0; python_version >= "3.12" -confluent-kafka>=2.0.0 \ No newline at end of file diff --git a/tests/test_options.py b/tests/test_options.py index 025ff092..43ac2d41 100644 --- a/tests/test_options.py +++ b/tests/test_options.py @@ -1,7 +1,10 @@ +# (c) Copyright IBM Corp. 2025 + import logging import os from typing import Generator +from mock import patch import pytest from instana.configurator import config @@ -38,6 +41,7 @@ def clean_env_vars(): class TestBaseOptions: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: + self.base_options = None yield clean_env_vars() if "tracing" in config.keys(): @@ -46,147 +50,173 @@ def _resource(self) -> Generator[None, None, None]: def test_base_options(self) -> None: if "INSTANA_DEBUG" in os.environ: del os.environ["INSTANA_DEBUG"] - test_base_options = BaseOptions() + self.base_options = BaseOptions() - assert not test_base_options.debug - assert test_base_options.log_level == logging.WARN - assert not test_base_options.extra_http_headers - assert not test_base_options.allow_exit_as_root - assert not test_base_options.ignore_endpoints - assert test_base_options.secrets_matcher == "contains-ignore-case" - assert test_base_options.secrets_list == ["key", "pass", "secret"] - assert not test_base_options.secrets + assert not self.base_options.debug + assert self.base_options.log_level == logging.WARN + assert not self.base_options.extra_http_headers + assert not self.base_options.allow_exit_as_root + assert not self.base_options.ignore_endpoints + assert self.base_options.secrets_matcher == "contains-ignore-case" + assert self.base_options.secrets_list == ["key", "pass", "secret"] + assert not self.base_options.secrets def test_base_options_with_config(self) -> None: - config["tracing"]["ignore_endpoints"] = "service1;service3:endpoint1,endpoint2" - test_base_options = BaseOptions() - assert test_base_options.ignore_endpoints == [ - "service1", - "service3.endpoint1", - "service3.endpoint2", + config["tracing"]["ignore_endpoints"] = "service1;service3:method1,method2" + self.base_options = BaseOptions() + assert self.base_options.ignore_endpoints == [ + "service1.*", + "service3.method1", + "service3.method2", ] + @patch.dict( + os.environ, + { + "INSTANA_DEBUG": "true", + "INSTANA_EXTRA_HTTP_HEADERS": "SOMETHING;HERE", + "INSTANA_IGNORE_ENDPOINTS": "service1;service2:method1,method2", + "INSTANA_SECRETS": "secret1:username,password", + }, + ) def test_base_options_with_env_vars(self) -> None: - os.environ["INSTANA_DEBUG"] = "true" - os.environ["INSTANA_EXTRA_HTTP_HEADERS"] = "SOMETHING;HERE" - os.environ["INSTANA_IGNORE_ENDPOINTS"] = "service1;service2:endpoint1,endpoint2" - os.environ["INSTANA_SECRETS"] = "secret1:username,password" - - test_base_options = BaseOptions() - assert test_base_options.log_level == logging.DEBUG - assert test_base_options.debug + self.base_options = BaseOptions() + assert self.base_options.log_level == logging.DEBUG + assert self.base_options.debug - assert test_base_options.extra_http_headers == ["something", "here"] + assert self.base_options.extra_http_headers == ["something", "here"] - assert test_base_options.ignore_endpoints == [ - "service1", - "service2.endpoint1", - "service2.endpoint2", + assert self.base_options.ignore_endpoints == [ + "service1.*", + "service2.method1", + "service2.method2", ] - assert test_base_options.secrets_matcher == "secret1" - assert test_base_options.secrets_list == ["username", "password"] + assert self.base_options.secrets_matcher == "secret1" + assert self.base_options.secrets_list == ["username", "password"] + + @patch.dict( + os.environ, + {"INSTANA_IGNORE_ENDPOINTS_PATH": "tests/util/test_configuration-1.yaml"}, + ) + def test_base_options_with_endpoint_file(self) -> None: + self.base_options = BaseOptions() + assert self.base_options.ignore_endpoints == [ + "redis.get", + "redis.type", + "dynamodb.query", + "kafka.consume.span-topic", + "kafka.consume.topic1", + "kafka.consume.topic2", + "kafka.send.span-topic", + "kafka.send.topic1", + "kafka.send.topic2", + "kafka.consume.topic3", + "kafka.*.span-topic", + "kafka.*.topic4", + ] + del self.base_options class TestStandardOptions: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: + self.standart_options = None yield clean_env_vars() if "tracing" in config.keys(): del config["tracing"] def test_standard_options(self) -> None: - test_standard_options = StandardOptions() + self.standart_options = StandardOptions() - assert test_standard_options.AGENT_DEFAULT_HOST == "localhost" - assert test_standard_options.AGENT_DEFAULT_PORT == 42699 + assert self.standart_options.AGENT_DEFAULT_HOST == "localhost" + assert self.standart_options.AGENT_DEFAULT_PORT == 42699 def test_set_secrets(self) -> None: - test_standard_options = StandardOptions() + self.standart_options = StandardOptions() test_secrets = {"matcher": "sample-match", "list": ["sample", "list"]} - test_standard_options.set_secrets(test_secrets) - assert test_standard_options.secrets_matcher == "sample-match" - assert test_standard_options.secrets_list == ["sample", "list"] + self.standart_options.set_secrets(test_secrets) + assert self.standart_options.secrets_matcher == "sample-match" + assert self.standart_options.secrets_list == ["sample", "list"] def test_set_extra_headers(self) -> None: - test_standard_options = StandardOptions() + self.standart_options = StandardOptions() test_headers = {"header1": "sample-match", "header2": ["sample", "list"]} - test_standard_options.set_extra_headers(test_headers) - assert test_standard_options.extra_http_headers == test_headers + self.standart_options.set_extra_headers(test_headers) + assert self.standart_options.extra_http_headers == test_headers def test_set_tracing(self) -> None: - test_standard_options = StandardOptions() + self.standart_options = StandardOptions() - test_tracing = {"ignore-endpoints": "service1;service2:endpoint1,endpoint2"} - test_standard_options.set_tracing(test_tracing) + test_tracing = {"ignore-endpoints": "service1;service2:method1,method2"} + self.standart_options.set_tracing(test_tracing) - assert test_standard_options.ignore_endpoints == [ - "service1", - "service2.endpoint1", - "service2.endpoint2", + assert self.standart_options.ignore_endpoints == [ + "service1.*", + "service2.method1", + "service2.method2", ] - assert not test_standard_options.extra_http_headers + assert not self.standart_options.extra_http_headers + @patch.dict( + os.environ, + {"INSTANA_IGNORE_ENDPOINTS": "env_service1;env_service2:method1,method2"}, + ) def test_set_tracing_priority(self) -> None: - # Environment variables > In-code Configuration > Agent Configuration - # First test when all attributes given - os.environ["INSTANA_IGNORE_ENDPOINTS"] = ( - "env_service1;env_service2:endpoint1,endpoint2" - ) config["tracing"]["ignore_endpoints"] = ( - "config_service1;config_service2:endpoint1,endpoint2" + "config_service1;config_service2:method1,method2" ) - test_tracing = {"ignore-endpoints": "service1;service2:endpoint1,endpoint2"} + test_tracing = {"ignore-endpoints": "service1;service2:method1,method2"} - test_standard_options = StandardOptions() - test_standard_options.set_tracing(test_tracing) + self.standart_options = StandardOptions() + self.standart_options.set_tracing(test_tracing) - assert test_standard_options.ignore_endpoints == [ - "env_service1", - "env_service2.endpoint1", - "env_service2.endpoint2", + assert self.standart_options.ignore_endpoints == [ + "env_service1.*", + "env_service2.method1", + "env_service2.method2", ] # Second test when In-code configuration and Agent configuration given del os.environ["INSTANA_IGNORE_ENDPOINTS"] - test_standard_options = StandardOptions() - test_standard_options.set_tracing(test_tracing) + self.standart_options = StandardOptions() + self.standart_options.set_tracing(test_tracing) - assert test_standard_options.ignore_endpoints == [ - "config_service1", - "config_service2.endpoint1", - "config_service2.endpoint2", + assert self.standart_options.ignore_endpoints == [ + "config_service1.*", + "config_service2.method1", + "config_service2.method2", ] def test_set_from(self) -> None: - test_standard_options = StandardOptions() + self.standart_options = StandardOptions() test_res_data = { "secrets": {"matcher": "sample-match", "list": ["sample", "list"]}, - "tracing": {"ignore-endpoints": "service1;service2:endpoint1,endpoint2"}, + "tracing": {"ignore-endpoints": "service1;service2:method1,method2"}, } - test_standard_options.set_from(test_res_data) + self.standart_options.set_from(test_res_data) assert ( - test_standard_options.secrets_matcher == test_res_data["secrets"]["matcher"] + self.standart_options.secrets_matcher == test_res_data["secrets"]["matcher"] ) - assert test_standard_options.secrets_list == test_res_data["secrets"]["list"] - assert test_standard_options.ignore_endpoints == [ - "service1", - "service2.endpoint1", - "service2.endpoint2", + assert self.standart_options.secrets_list == test_res_data["secrets"]["list"] + assert self.standart_options.ignore_endpoints == [ + "service1.*", + "service2.method1", + "service2.method2", ] test_res_data = { "extraHeaders": {"header1": "sample-match", "header2": ["sample", "list"]}, } - test_standard_options.set_from(test_res_data) + self.standart_options.set_from(test_res_data) - assert test_standard_options.extra_http_headers == test_res_data["extraHeaders"] + assert self.standart_options.extra_http_headers == test_res_data["extraHeaders"] def test_set_from_bool( self, @@ -195,9 +225,9 @@ def test_set_from_bool( caplog.set_level(logging.DEBUG, logger="instana") caplog.clear() - test_standard_options = StandardOptions() + self.standart_options = StandardOptions() test_res_data = True - test_standard_options.set_from(test_res_data) + self.standart_options.set_from(test_res_data) assert len(caplog.messages) == 1 assert len(caplog.records) == 1 @@ -205,180 +235,201 @@ def test_set_from_bool( "options.set_from: Wrong data type - " in caplog.messages[0] ) - assert test_standard_options.secrets_list == ["key", "pass", "secret"] - assert test_standard_options.ignore_endpoints == [] - assert not test_standard_options.extra_http_headers + assert self.standart_options.secrets_list == ["key", "pass", "secret"] + assert self.standart_options.ignore_endpoints == [] + assert not self.standart_options.extra_http_headers class TestServerlessOptions: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: + self.serverless_options = None yield clean_env_vars() def test_serverless_options(self) -> None: - test_serverless_options = ServerlessOptions() - - assert not test_serverless_options.debug - assert test_serverless_options.log_level == logging.WARN - assert not test_serverless_options.extra_http_headers - assert not test_serverless_options.allow_exit_as_root - assert not test_serverless_options.ignore_endpoints - assert test_serverless_options.secrets_matcher == "contains-ignore-case" - assert test_serverless_options.secrets_list == ["key", "pass", "secret"] - assert not test_serverless_options.secrets - assert not test_serverless_options.agent_key - assert not test_serverless_options.endpoint_url - assert test_serverless_options.ssl_verify - assert not test_serverless_options.endpoint_proxy - assert test_serverless_options.timeout == 0.8 - + self.serverless_options = ServerlessOptions() + + assert not self.serverless_options.debug + assert self.serverless_options.log_level == logging.WARN + assert not self.serverless_options.extra_http_headers + assert not self.serverless_options.allow_exit_as_root + assert not self.serverless_options.ignore_endpoints + assert self.serverless_options.secrets_matcher == "contains-ignore-case" + assert self.serverless_options.secrets_list == ["key", "pass", "secret"] + assert not self.serverless_options.secrets + assert not self.serverless_options.agent_key + assert not self.serverless_options.endpoint_url + assert self.serverless_options.ssl_verify + assert not self.serverless_options.endpoint_proxy + assert self.serverless_options.timeout == 0.8 + + @patch.dict( + os.environ, + { + "INSTANA_AGENT_KEY": "key1", + "INSTANA_ENDPOINT_URL": "localhost", + "INSTANA_DISABLE_CA_CHECK": "true", + "INSTANA_ENDPOINT_PROXY": "proxy1", + "INSTANA_TIMEOUT": "3000", + "INSTANA_LOG_LEVEL": "info", + }, + ) def test_serverless_options_with_env_vars(self) -> None: - os.environ["INSTANA_AGENT_KEY"] = "key1" - os.environ["INSTANA_ENDPOINT_URL"] = "localhost" - os.environ["INSTANA_DISABLE_CA_CHECK"] = "true" - os.environ["INSTANA_ENDPOINT_PROXY"] = "proxy1" - os.environ["INSTANA_TIMEOUT"] = "3000" - os.environ["INSTANA_LOG_LEVEL"] = "info" - - test_serverless_options = ServerlessOptions() + self.serverless_options = ServerlessOptions() - assert test_serverless_options.agent_key == "key1" - assert test_serverless_options.endpoint_url == "localhost" - assert not test_serverless_options.ssl_verify - assert test_serverless_options.endpoint_proxy == {"https": "proxy1"} - assert test_serverless_options.timeout == 3 - assert test_serverless_options.log_level == logging.INFO + assert self.serverless_options.agent_key == "key1" + assert self.serverless_options.endpoint_url == "localhost" + assert not self.serverless_options.ssl_verify + assert self.serverless_options.endpoint_proxy == {"https": "proxy1"} + assert self.serverless_options.timeout == 3 + assert self.serverless_options.log_level == logging.INFO class TestAWSLambdaOptions: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: + self.aws_lambda_options = None yield clean_env_vars() def test_aws_lambda_options(self) -> None: - test_aws_lambda_options = AWSLambdaOptions() + self.aws_lambda_options = AWSLambdaOptions() - assert not test_aws_lambda_options.agent_key - assert not test_aws_lambda_options.endpoint_url - assert test_aws_lambda_options.ssl_verify - assert not test_aws_lambda_options.endpoint_proxy - assert test_aws_lambda_options.timeout == 0.8 - assert test_aws_lambda_options.log_level == logging.WARN + assert not self.aws_lambda_options.agent_key + assert not self.aws_lambda_options.endpoint_url + assert self.aws_lambda_options.ssl_verify + assert not self.aws_lambda_options.endpoint_proxy + assert self.aws_lambda_options.timeout == 0.8 + assert self.aws_lambda_options.log_level == logging.WARN class TestAWSFargateOptions: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: + self.aws_fargate_options = None yield clean_env_vars() def test_aws_fargate_options(self) -> None: - test_aws_fargate_options = AWSFargateOptions() - - assert not test_aws_fargate_options.agent_key - assert not test_aws_fargate_options.endpoint_url - assert test_aws_fargate_options.ssl_verify - assert not test_aws_fargate_options.endpoint_proxy - assert test_aws_fargate_options.timeout == 0.8 - assert test_aws_fargate_options.log_level == logging.WARN - assert not test_aws_fargate_options.tags - assert not test_aws_fargate_options.zone - + self.aws_fargate_options = AWSFargateOptions() + + assert not self.aws_fargate_options.agent_key + assert not self.aws_fargate_options.endpoint_url + assert self.aws_fargate_options.ssl_verify + assert not self.aws_fargate_options.endpoint_proxy + assert self.aws_fargate_options.timeout == 0.8 + assert self.aws_fargate_options.log_level == logging.WARN + assert not self.aws_fargate_options.tags + assert not self.aws_fargate_options.zone + + @patch.dict( + os.environ, + { + "INSTANA_AGENT_KEY": "key1", + "INSTANA_ENDPOINT_URL": "localhost", + "INSTANA_DISABLE_CA_CHECK": "true", + "INSTANA_ENDPOINT_PROXY": "proxy1", + "INSTANA_TIMEOUT": "3000", + "INSTANA_LOG_LEVEL": "info", + "INSTANA_TAGS": "key1=value1,key2=value2", + "INSTANA_ZONE": "zone1", + }, + ) def test_aws_fargate_options_with_env_vars(self) -> None: - os.environ["INSTANA_AGENT_KEY"] = "key1" - os.environ["INSTANA_ENDPOINT_URL"] = "localhost" - os.environ["INSTANA_DISABLE_CA_CHECK"] = "true" - os.environ["INSTANA_ENDPOINT_PROXY"] = "proxy1" - os.environ["INSTANA_TIMEOUT"] = "3000" - os.environ["INSTANA_LOG_LEVEL"] = "info" - os.environ["INSTANA_TAGS"] = "key1=value1,key2=value2" - os.environ["INSTANA_ZONE"] = "zone1" - - test_aws_fargate_options = AWSFargateOptions() + self.aws_fargate_options = AWSFargateOptions() - assert test_aws_fargate_options.agent_key == "key1" - assert test_aws_fargate_options.endpoint_url == "localhost" - assert not test_aws_fargate_options.ssl_verify - assert test_aws_fargate_options.endpoint_proxy == {"https": "proxy1"} - assert test_aws_fargate_options.timeout == 3 - assert test_aws_fargate_options.log_level == logging.INFO + assert self.aws_fargate_options.agent_key == "key1" + assert self.aws_fargate_options.endpoint_url == "localhost" + assert not self.aws_fargate_options.ssl_verify + assert self.aws_fargate_options.endpoint_proxy == {"https": "proxy1"} + assert self.aws_fargate_options.timeout == 3 + assert self.aws_fargate_options.log_level == logging.INFO - assert test_aws_fargate_options.tags == {"key1": "value1", "key2": "value2"} - assert test_aws_fargate_options.zone == "zone1" + assert self.aws_fargate_options.tags == {"key1": "value1", "key2": "value2"} + assert self.aws_fargate_options.zone == "zone1" class TestEKSFargateOptions: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: + self.eks_fargate_options = None yield clean_env_vars() def test_eks_fargate_options(self) -> None: - test_eks_fargate_options = EKSFargateOptions() - - assert not test_eks_fargate_options.agent_key - assert not test_eks_fargate_options.endpoint_url - assert test_eks_fargate_options.ssl_verify - assert not test_eks_fargate_options.endpoint_proxy - assert test_eks_fargate_options.timeout == 0.8 - assert test_eks_fargate_options.log_level == logging.WARN - + self.eks_fargate_options = EKSFargateOptions() + + assert not self.eks_fargate_options.agent_key + assert not self.eks_fargate_options.endpoint_url + assert self.eks_fargate_options.ssl_verify + assert not self.eks_fargate_options.endpoint_proxy + assert self.eks_fargate_options.timeout == 0.8 + assert self.eks_fargate_options.log_level == logging.WARN + + @patch.dict( + os.environ, + { + "INSTANA_AGENT_KEY": "key1", + "INSTANA_ENDPOINT_URL": "localhost", + "INSTANA_DISABLE_CA_CHECK": "true", + "INSTANA_ENDPOINT_PROXY": "proxy1", + "INSTANA_TIMEOUT": "3000", + "INSTANA_LOG_LEVEL": "info", + }, + ) def test_eks_fargate_options_with_env_vars(self) -> None: - os.environ["INSTANA_AGENT_KEY"] = "key1" - os.environ["INSTANA_ENDPOINT_URL"] = "localhost" - os.environ["INSTANA_DISABLE_CA_CHECK"] = "true" - os.environ["INSTANA_ENDPOINT_PROXY"] = "proxy1" - os.environ["INSTANA_TIMEOUT"] = "3000" - os.environ["INSTANA_LOG_LEVEL"] = "info" - - test_eks_fargate_options = EKSFargateOptions() + self.eks_fargate_options = EKSFargateOptions() - assert test_eks_fargate_options.agent_key == "key1" - assert test_eks_fargate_options.endpoint_url == "localhost" - assert not test_eks_fargate_options.ssl_verify - assert test_eks_fargate_options.endpoint_proxy == {"https": "proxy1"} - assert test_eks_fargate_options.timeout == 3 - assert test_eks_fargate_options.log_level == logging.INFO + assert self.eks_fargate_options.agent_key == "key1" + assert self.eks_fargate_options.endpoint_url == "localhost" + assert not self.eks_fargate_options.ssl_verify + assert self.eks_fargate_options.endpoint_proxy == {"https": "proxy1"} + assert self.eks_fargate_options.timeout == 3 + assert self.eks_fargate_options.log_level == logging.INFO class TestGCROptions: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: + self.gcr_options = None yield clean_env_vars() def test_gcr_options(self) -> None: - test_gcr_options = GCROptions() - - assert not test_gcr_options.debug - assert test_gcr_options.log_level == logging.WARN - assert not test_gcr_options.extra_http_headers - assert not test_gcr_options.allow_exit_as_root - assert not test_gcr_options.ignore_endpoints - assert test_gcr_options.secrets_matcher == "contains-ignore-case" - assert test_gcr_options.secrets_list == ["key", "pass", "secret"] - assert not test_gcr_options.secrets - assert not test_gcr_options.agent_key - assert not test_gcr_options.endpoint_url - assert test_gcr_options.ssl_verify - assert not test_gcr_options.endpoint_proxy - assert test_gcr_options.timeout == 0.8 - + self.gcr_options = GCROptions() + + assert not self.gcr_options.debug + assert self.gcr_options.log_level == logging.WARN + assert not self.gcr_options.extra_http_headers + assert not self.gcr_options.allow_exit_as_root + assert not self.gcr_options.ignore_endpoints + assert self.gcr_options.secrets_matcher == "contains-ignore-case" + assert self.gcr_options.secrets_list == ["key", "pass", "secret"] + assert not self.gcr_options.secrets + assert not self.gcr_options.agent_key + assert not self.gcr_options.endpoint_url + assert self.gcr_options.ssl_verify + assert not self.gcr_options.endpoint_proxy + assert self.gcr_options.timeout == 0.8 + + @patch.dict( + os.environ, + { + "INSTANA_AGENT_KEY": "key1", + "INSTANA_ENDPOINT_URL": "localhost", + "INSTANA_DISABLE_CA_CHECK": "true", + "INSTANA_ENDPOINT_PROXY": "proxy1", + "INSTANA_TIMEOUT": "3000", + "INSTANA_LOG_LEVEL": "info", + }, + ) def test_gcr_options_with_env_vars(self) -> None: - os.environ["INSTANA_AGENT_KEY"] = "key1" - os.environ["INSTANA_ENDPOINT_URL"] = "localhost" - os.environ["INSTANA_DISABLE_CA_CHECK"] = "true" - os.environ["INSTANA_ENDPOINT_PROXY"] = "proxy1" - os.environ["INSTANA_TIMEOUT"] = "3000" - os.environ["INSTANA_LOG_LEVEL"] = "info" - - test_gcr_options = GCROptions() - - assert test_gcr_options.agent_key == "key1" - assert test_gcr_options.endpoint_url == "localhost" - assert not test_gcr_options.ssl_verify - assert test_gcr_options.endpoint_proxy == {"https": "proxy1"} - assert test_gcr_options.timeout == 3 - assert test_gcr_options.log_level == logging.INFO + self.gcr_options = GCROptions() + + assert self.gcr_options.agent_key == "key1" + assert self.gcr_options.endpoint_url == "localhost" + assert not self.gcr_options.ssl_verify + assert self.gcr_options.endpoint_proxy == {"https": "proxy1"} + assert self.gcr_options.timeout == 3 + assert self.gcr_options.log_level == logging.INFO diff --git a/tests/util/test_config.py b/tests/util/test_config.py index 891007e8..908035d7 100644 --- a/tests/util/test_config.py +++ b/tests/util/test_config.py @@ -1,8 +1,7 @@ -from typing import Generator - -import pytest +# (c) Copyright IBM Corp. 2025 from instana.util.config import ( + parse_endpoints_of_service, parse_ignored_endpoints, parse_ignored_endpoints_dict, parse_service_pair, @@ -10,33 +9,29 @@ class TestConfig: - @pytest.fixture(autouse=True) - def _resource(self) -> Generator[None, None, None]: - yield - def test_parse_service_pair(self) -> None: - test_string = "service1:endpoint1,endpoint2" + test_string = "service1:method1,method2" response = parse_service_pair(test_string) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_string = "service1;service2" response = parse_ignored_endpoints(test_string) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_string = "service1" response = parse_ignored_endpoints(test_string) - assert response == ["service1"] + assert response == ["service1.*"] test_string = ";" response = parse_ignored_endpoints(test_string) assert response == [] - test_string = "service1:endpoint1,endpoint2;;;service2:endpoint1;;" + test_string = "service1:method1,method2;;;service2:method1;;" response = parse_ignored_endpoints(test_string) assert response == [ - "service1.endpoint1", - "service1.endpoint2", - "service2.endpoint1", + "service1.method1", + "service1.method2", + "service2.method1", ] test_string = "" @@ -44,28 +39,28 @@ def test_parse_service_pair(self) -> None: assert response == [] def test_parse_ignored_endpoints_string(self) -> None: - test_string = "service1:endpoint1,endpoint2" + test_string = "service1:method1,method2" response = parse_service_pair(test_string) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_string = "service1;service2" response = parse_ignored_endpoints(test_string) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_string = "service1" response = parse_ignored_endpoints(test_string) - assert response == ["service1"] + assert response == ["service1.*"] test_string = ";" response = parse_ignored_endpoints(test_string) assert response == [] - test_string = "service1:endpoint1,endpoint2;;;service2:endpoint1;;" + test_string = "service1:method1,method2;;;service2:method1;;" response = parse_ignored_endpoints(test_string) assert response == [ - "service1.endpoint1", - "service1.endpoint2", - "service2.endpoint1", + "service1.method1", + "service1.method2", + "service2.method1", ] test_string = "" @@ -73,67 +68,92 @@ def test_parse_ignored_endpoints_string(self) -> None: assert response == [] def test_parse_ignored_endpoints_dict(self) -> None: - test_dict = {"service1": ["endpoint1", "endpoint2"]} + test_dict = {"service1": ["method1", "method2"]} response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] - test_dict = {"SERVICE1": ["ENDPOINT1", "ENDPOINT2"]} + test_dict = {"SERVICE1": ["method1", "method2"]} response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_dict = {"service1": [], "service2": []} response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_dict = {"service1": []} response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1"] + assert response == ["service1.*"] test_dict = {} response = parse_ignored_endpoints_dict(test_dict) assert response == [] def test_parse_ignored_endpoints(self) -> None: - test_pair = "service1:endpoint1,endpoint2" + test_pair = "service1:method1,method2" response = parse_ignored_endpoints(test_pair) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_pair = "service1;service2" response = parse_ignored_endpoints(test_pair) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_pair = "service1" response = parse_ignored_endpoints(test_pair) - assert response == ["service1"] + assert response == ["service1.*"] test_pair = ";" response = parse_ignored_endpoints(test_pair) assert response == [] - test_pair = "service1:endpoint1,endpoint2;;;service2:endpoint1;;" + test_pair = "service1:method1,method2;;;service2:method1;;" response = parse_ignored_endpoints(test_pair) assert response == [ - "service1.endpoint1", - "service1.endpoint2", - "service2.endpoint1", + "service1.method1", + "service1.method2", + "service2.method1", ] test_pair = "" response = parse_ignored_endpoints(test_pair) assert response == [] - test_dict = {"service1": ["endpoint1", "endpoint2"]} + test_dict = {"service1": ["method1", "method2"]} response = parse_ignored_endpoints(test_dict) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_dict = {"service1": [], "service2": []} response = parse_ignored_endpoints(test_dict) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_dict = {"service1": []} response = parse_ignored_endpoints(test_dict) - assert response == ["service1"] + assert response == ["service1.*"] test_dict = {} response = parse_ignored_endpoints(test_dict) assert response == [] + + def test_parse_endpoints_of_service(self) -> None: + test_ignore_endpoints = { + "service1": ["method1", "method2"], + "service2": ["method3", "method4"], + "kafka": [ + { + "methods": ["method5", "method6"], + "endpoints": ["endpoint1", "endpoint2"], + } + ], + } + ignore_endpoints = [] + for service, methods in test_ignore_endpoints.items(): + ignore_endpoints.extend(parse_endpoints_of_service([], service, methods)) + assert ignore_endpoints == [ + "service1.method1", + "service1.method2", + "service2.method3", + "service2.method4", + "kafka.method5.endpoint1", + "kafka.method5.endpoint2", + "kafka.method6.endpoint1", + "kafka.method6.endpoint2", + ] diff --git a/tests/util/test_config_reader.py b/tests/util/test_config_reader.py new file mode 100644 index 00000000..0c9c3ede --- /dev/null +++ b/tests/util/test_config_reader.py @@ -0,0 +1,63 @@ +# (c) Copyright IBM Corp. 2025 + +import logging + +import pytest + +from instana.util.config import parse_ignored_endpoints_from_yaml + + +class TestConfigReader: + def test_load_configuration_with_tracing( + self, caplog: pytest.LogCaptureFixture + ) -> None: + caplog.set_level(logging.DEBUG, logger="instana") + + ignore_endpoints = parse_ignored_endpoints_from_yaml( + "tests/util/test_configuration-1.yaml" + ) + # test with tracing + assert ignore_endpoints == [ + "redis.get", + "redis.type", + "dynamodb.query", + "kafka.consume.span-topic", + "kafka.consume.topic1", + "kafka.consume.topic2", + "kafka.send.span-topic", + "kafka.send.topic1", + "kafka.send.topic2", + "kafka.consume.topic3", + "kafka.*.span-topic", + "kafka.*.topic4", + ] + + assert ( + 'Please use "tracing" instead of "com.instana.tracing" for local configuration file.' + not in caplog.messages + ) + + def test_load_configuration_legacy(self, caplog: pytest.LogCaptureFixture) -> None: + caplog.set_level(logging.DEBUG, logger="instana") + + ignore_endpoints = parse_ignored_endpoints_from_yaml( + "tests/util/test_configuration-2.yaml" + ) + assert ignore_endpoints == [ + "redis.get", + "redis.type", + "dynamodb.query", + "kafka.consume.span-topic", + "kafka.consume.topic1", + "kafka.consume.topic2", + "kafka.send.span-topic", + "kafka.send.topic1", + "kafka.send.topic2", + "kafka.consume.topic3", + "kafka.*.span-topic", + "kafka.*.topic4", + ] + assert ( + 'Please use "tracing" instead of "com.instana.tracing" for local configuration file.' + in caplog.messages + ) diff --git a/tests/util/test_configuration-1.yaml b/tests/util/test_configuration-1.yaml new file mode 100644 index 00000000..af890a35 --- /dev/null +++ b/tests/util/test_configuration-1.yaml @@ -0,0 +1,19 @@ +# (c) Copyright IBM Corp. 2025 + +# service-level configuration, aligning with in-code settings +tracing: + ignore-endpoints: + redis: + - get + - type + dynamodb: + - query + kafka: + - methods: ["consume", "send"] + endpoints: ["span-topic", "topic1", "topic2"] + - methods: ["consume"] + endpoints: ["topic3"] + - methods: ["*"] # Applied to all methods + endpoints: ["span-topic", "topic4"] + # - methods: ["consume", "send"] + # endpoints: ["*"] # Applied to all topics diff --git a/tests/util/test_configuration-2.yaml b/tests/util/test_configuration-2.yaml new file mode 100644 index 00000000..582202f0 --- /dev/null +++ b/tests/util/test_configuration-2.yaml @@ -0,0 +1,19 @@ +# (c) Copyright IBM Corp. 2025 + +# service-level configuration, aligning with in-code settings +com.instana.tracing: + ignore-endpoints: + redis: + - get + - type + dynamodb: + - query + kafka: + - methods: ["consume", "send"] + endpoints: ["span-topic", "topic1", "topic2"] + - methods: ["consume"] + endpoints: ["topic3"] + - methods: ["*"] # Applied to all methods + endpoints: ["span-topic", "topic4"] + # - methods: ["consume", "send"] + # endpoints: ["*"] # Applied to all topics diff --git a/tests/util/test_span_utils.py b/tests/util/test_span_utils.py index 32f623c6..22f67653 100644 --- a/tests/util/test_span_utils.py +++ b/tests/util/test_span_utils.py @@ -1,15 +1,22 @@ -from typing import Optional +from typing import List, Optional import pytest -from instana.util.span_utils import get_operation_specifier +from instana.util.span_utils import get_operation_specifiers @pytest.mark.parametrize( "span_name, expected_result", - [("something", ""), ("redis", "command"), ("dynamodb", "op")], + [ + ("something", ["", ""]), + ("redis", ["command", ""]), + ("dynamodb", ["op", ""]), + ("kafka", ["access", "service"]), + ], ) -def test_get_operation_specifier( - span_name: str, expected_result: Optional[str] +def test_get_operation_specifiers( + span_name: str, + expected_result: Optional[List[str]], ) -> None: - response_redis = get_operation_specifier(span_name) - assert response_redis == expected_result + operation_specifier, service_specifier = get_operation_specifiers(span_name) + assert operation_specifier == expected_result[0] + assert service_specifier == expected_result[1] From f9693d91042292fa4adda8c49ec2306378d07742 Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Fri, 9 May 2025 11:51:35 +0200 Subject: [PATCH 2/2] feat: added span filtering for confluent-kafka module Signed-off-by: Cagri Yonca --- tests/clients/kafka/test_confluent_kafka.py | 129 +++++++++++++++++++- 1 file changed, 128 insertions(+), 1 deletion(-) diff --git a/tests/clients/kafka/test_confluent_kafka.py b/tests/clients/kafka/test_confluent_kafka.py index 0b995e81..3f140f32 100644 --- a/tests/clients/kafka/test_confluent_kafka.py +++ b/tests/clients/kafka/test_confluent_kafka.py @@ -1,5 +1,6 @@ # (c) Copyright IBM Corp. 2025 +import os from typing import Generator import pytest @@ -11,7 +12,9 @@ from confluent_kafka.admin import AdminClient, NewTopic from opentelemetry.trace import SpanKind +from instana.options import StandardOptions from instana.singletons import agent, tracer +from instana.util.config import parse_ignored_endpoints_from_yaml from tests.helpers import get_first_span_by_filter, testenv @@ -29,7 +32,7 @@ def _resource(self) -> Generator[None, None, None]: self.kafka_client = AdminClient(self.kafka_config) try: - topics = self.kafka_client.create_topics( # noqa: F841 + _ = self.kafka_client.create_topics( # noqa: F841 [ NewTopic( testenv["kafka_topic"], @@ -187,6 +190,130 @@ def test_trace_confluent_kafka_error(self) -> None: == "num_messages must be between 0 and 1000000 (1M)" ) + def test_ignore_confluent_kafka(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka" + agent.options = StandardOptions() + + with tracer.start_as_current_span("test"): + self.producer.produce(testenv["kafka_topic"], b"raw_bytes") + self.producer.flush(timeout=10) + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + def test_ignore_confluent_kafka_producer(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:produce" + agent.options = StandardOptions() + + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.produce(testenv["kafka_topic"], b"raw_bytes2") + self.producer.flush() + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"]]) + consumer.consume(num_messages=2, timeout=60) + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 5 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 3 + + def test_ignore_confluent_kafka_consumer(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:consume" + agent.options = StandardOptions() + + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.produce(testenv["kafka_topic"], b"raw_bytes2") + self.producer.flush() + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"]]) + consumer.consume(num_messages=2, timeout=60) + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 5 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 3 + + def test_ignore_confluent_specific_topic(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:consume" + os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"] = ( + "tests/util/test_configuration-1.yaml" + ) + + agent.options = StandardOptions() + + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.flush() + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"]]) + consumer.consume(num_messages=1, timeout=60) + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + def test_ignore_confluent_specific_topic_with_config_file(self) -> None: + agent.options.ignore_endpoints = parse_ignored_endpoints_from_yaml( + "tests/util/test_configuration-1.yaml" + ) + + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.flush() + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"]]) + consumer.consume(num_messages=1, timeout=60) + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + def test_confluent_kafka_consumer_root_exit(self) -> None: agent.options.allow_exit_as_root = True