Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete duplicated OTLP Exporter tests, move them to the mixin unit test. Fix broken shutdown unit test. #4504

Merged
merged 16 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,4 +369,4 @@ def _exporting(self) -> str:
Returns a string that describes the overall exporter, to be used in
warning messages.
"""
pass
pass
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ def _translate_data(
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
return self._export(spans)

def shutdown(self) -> None:
OTLPExporterMixin.shutdown(self)
def shutdown(self, timeout_millis: float = 30_000) -> None:
OTLPExporterMixin.shutdown(self, timeout_millis)

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,20 @@
# pylint: disable=too-many-lines

import time
from concurrent.futures import ThreadPoolExecutor
from os.path import dirname
from unittest import TestCase
from unittest.mock import patch

from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module
Duration,
)
from google.protobuf.json_format import MessageToDict
from google.rpc.error_details_pb2 import ( # pylint: disable=no-name-in-module
RetryInfo,
)
from grpc import ChannelCredentials, Compression, StatusCode, server
from grpc import ChannelCredentials, Compression

from opentelemetry._logs import SeverityNumber
from opentelemetry.exporter.otlp.proto.common._internal import _encode_value
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.version import __version__
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
ExportLogsServiceRequest,
ExportLogsServiceResponse,
)
from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import (
LogsServiceServicer,
add_LogsServiceServicer_to_server,
)
from opentelemetry.proto.common.v1.common_pb2 import AnyValue, KeyValue
from opentelemetry.proto.common.v1.common_pb2 import (
Expand All @@ -53,7 +40,6 @@
Resource as OTLPResource,
)
from opentelemetry.sdk._logs import LogData, LogRecord
from opentelemetry.sdk._logs.export import LogExportResult
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE,
OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE,
Expand All @@ -70,62 +56,9 @@
THIS_DIR = dirname(__file__)


class LogsServiceServicerUNAVAILABLEDelay(LogsServiceServicer):
# pylint: disable=invalid-name,unused-argument,no-self-use
def Export(self, request, context):
context.set_code(StatusCode.UNAVAILABLE)

context.send_initial_metadata(
(("google.rpc.retryinfo-bin", RetryInfo().SerializeToString()),)
)
context.set_trailing_metadata(
(
(
"google.rpc.retryinfo-bin",
RetryInfo(
retry_delay=Duration(nanos=int(1e7))
).SerializeToString(),
),
)
)

return ExportLogsServiceResponse()


class LogsServiceServicerUNAVAILABLE(LogsServiceServicer):
# pylint: disable=invalid-name,unused-argument,no-self-use
def Export(self, request, context):
context.set_code(StatusCode.UNAVAILABLE)

return ExportLogsServiceResponse()


class LogsServiceServicerSUCCESS(LogsServiceServicer):
# pylint: disable=invalid-name,unused-argument,no-self-use
def Export(self, request, context):
context.set_code(StatusCode.OK)

return ExportLogsServiceResponse()


class LogsServiceServicerALREADY_EXISTS(LogsServiceServicer):
# pylint: disable=invalid-name,unused-argument,no-self-use
def Export(self, request, context):
context.set_code(StatusCode.ALREADY_EXISTS)

return ExportLogsServiceResponse()


class TestOTLPLogExporter(TestCase):
def setUp(self):
self.exporter = OTLPLogExporter()

self.server = server(ThreadPoolExecutor(max_workers=10))

self.server.add_insecure_port("127.0.0.1:4317")

self.server.start()

self.log_data_1 = LogData(
log_record=LogRecord(
timestamp=int(time.time() * 1e9),
Expand Down Expand Up @@ -204,9 +137,6 @@ def setUp(self):
),
)

def tearDown(self):
self.server.stop(None)

def test_exporting(self):
# pylint: disable=protected-access
self.assertEqual(self.exporter._exporting, "logs")
Expand Down Expand Up @@ -296,145 +226,6 @@ def test_env_variables_with_only_certificate(

mock_logger_error.assert_not_called()

@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter.ssl_channel_credentials"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel")
@patch(
"opentelemetry.exporter.otlp.proto.grpc._log_exporter.OTLPLogExporter._stub"
)
# pylint: disable=unused-argument
def test_no_credentials_error(
self, mock_ssl_channel, mock_secure, mock_stub
):
OTLPLogExporter(insecure=False)
self.assertTrue(mock_ssl_channel.called)

# pylint: disable=no-self-use
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel")
def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
expected_endpoint = "localhost:4317"
endpoints = [
(
"http://localhost:4317",
None,
mock_insecure,
),
(
"localhost:4317",
None,
mock_secure,
),
(
"http://localhost:4317",
True,
mock_insecure,
),
(
"localhost:4317",
True,
mock_insecure,
),
(
"http://localhost:4317",
False,
mock_secure,
),
(
"localhost:4317",
False,
mock_secure,
),
(
"https://localhost:4317",
False,
mock_secure,
),
(
"https://localhost:4317",
None,
mock_secure,
),
(
"https://localhost:4317",
True,
mock_secure,
),
]

# pylint: disable=C0209
for endpoint, insecure, mock_method in endpoints:
OTLPLogExporter(endpoint=endpoint, insecure=insecure)
self.assertEqual(
1,
mock_method.call_count,
"expected {} to be called for {} {}".format(
mock_method, endpoint, insecure
),
)
self.assertEqual(
expected_endpoint,
mock_method.call_args[0][0],
"expected {} got {} {}".format(
expected_endpoint, mock_method.call_args[0][0], endpoint
),
)
mock_method.reset_mock()

def test_otlp_headers_from_env(self):
# pylint: disable=protected-access
self.assertEqual(
self.exporter._headers,
(("user-agent", "OTel-OTLP-Exporter-Python/" + __version__),),
)

@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):
mock_expo.configure_mock(**{"return_value": [0.01]})

add_LogsServiceServicer_to_server(
LogsServiceServicerUNAVAILABLE(), self.server
)
self.assertEqual(
self.exporter.export([self.log_data_1]), LogExportResult.FAILURE
)
mock_sleep.assert_called_with(0.01)

@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):
mock_expo.configure_mock(**{"return_value": [1]})

add_LogsServiceServicer_to_server(
LogsServiceServicerUNAVAILABLEDelay(), self.server
)
self.assertEqual(
self.exporter.export([self.log_data_1]), LogExportResult.FAILURE
)
mock_sleep.assert_called_with(0.01)

def test_success(self):
add_LogsServiceServicer_to_server(
LogsServiceServicerSUCCESS(), self.server
)
self.assertEqual(
self.exporter.export([self.log_data_1]), LogExportResult.SUCCESS
)

def test_failure(self):
add_LogsServiceServicer_to_server(
LogsServiceServicerALREADY_EXISTS(), self.server
)
self.assertEqual(
self.exporter.export([self.log_data_1]), LogExportResult.FAILURE
)

def export_log_and_deserialize(self, log_data):
# pylint: disable=protected-access
translated_data = self.exporter._translate_data([log_data])
Expand Down
Loading
Loading