diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py index a31679fb0d5..8f3677784b1 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -15,33 +15,20 @@ # pylint: disable=too-many-lines import time -from concurrent.futures import ThreadPoolExecutor from os.path import dirname from unittest import TestCase from unittest.mock import patch -from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module - Duration, -) from google.protobuf.json_format import MessageToDict -from google.rpc.error_details_pb2 import ( # pylint: disable=no-name-in-module - RetryInfo, -) -from grpc import ChannelCredentials, Compression, StatusCode, server +from grpc import ChannelCredentials, Compression from opentelemetry._logs import SeverityNumber from opentelemetry.exporter.otlp.proto.common._internal import _encode_value from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( OTLPLogExporter, ) -from opentelemetry.exporter.otlp.proto.grpc.version import __version__ from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( ExportLogsServiceRequest, - ExportLogsServiceResponse, -) -from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import ( - LogsServiceServicer, - add_LogsServiceServicer_to_server, ) from opentelemetry.proto.common.v1.common_pb2 import AnyValue, KeyValue from opentelemetry.proto.common.v1.common_pb2 import ( @@ -53,7 +40,6 @@ Resource as OTLPResource, ) from opentelemetry.sdk._logs import LogData, LogRecord -from opentelemetry.sdk._logs.export import LogExportResult from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE, OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE, @@ -70,62 +56,9 @@ THIS_DIR = dirname(__file__) -class LogsServiceServicerUNAVAILABLEDelay(LogsServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.UNAVAILABLE) - - context.send_initial_metadata( - (("google.rpc.retryinfo-bin", RetryInfo().SerializeToString()),) - ) - context.set_trailing_metadata( - ( - ( - "google.rpc.retryinfo-bin", - RetryInfo( - retry_delay=Duration(nanos=int(1e7)) - ).SerializeToString(), - ), - ) - ) - - return ExportLogsServiceResponse() - - -class LogsServiceServicerUNAVAILABLE(LogsServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.UNAVAILABLE) - - return ExportLogsServiceResponse() - - -class LogsServiceServicerSUCCESS(LogsServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.OK) - - return ExportLogsServiceResponse() - - -class LogsServiceServicerALREADY_EXISTS(LogsServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.ALREADY_EXISTS) - - return ExportLogsServiceResponse() - - class TestOTLPLogExporter(TestCase): def setUp(self): self.exporter = OTLPLogExporter() - - self.server = server(ThreadPoolExecutor(max_workers=10)) - - self.server.add_insecure_port("127.0.0.1:4317") - - self.server.start() - self.log_data_1 = LogData( log_record=LogRecord( timestamp=int(time.time() * 1e9), @@ -204,9 +137,6 @@ def setUp(self): ), ) - def tearDown(self): - self.server.stop(None) - def test_exporting(self): # pylint: disable=protected-access self.assertEqual(self.exporter._exporting, "logs") @@ -296,145 +226,6 @@ def test_env_variables_with_only_certificate( mock_logger_error.assert_not_called() - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter.ssl_channel_credentials" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") - @patch( - "opentelemetry.exporter.otlp.proto.grpc._log_exporter.OTLPLogExporter._stub" - ) - # pylint: disable=unused-argument - def test_no_credentials_error( - self, mock_ssl_channel, mock_secure, mock_stub - ): - OTLPLogExporter(insecure=False) - self.assertTrue(mock_ssl_channel.called) - - # pylint: disable=no-self-use - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") - def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): - expected_endpoint = "localhost:4317" - endpoints = [ - ( - "http://localhost:4317", - None, - mock_insecure, - ), - ( - "localhost:4317", - None, - mock_secure, - ), - ( - "http://localhost:4317", - True, - mock_insecure, - ), - ( - "localhost:4317", - True, - mock_insecure, - ), - ( - "http://localhost:4317", - False, - mock_secure, - ), - ( - "localhost:4317", - False, - mock_secure, - ), - ( - "https://localhost:4317", - False, - mock_secure, - ), - ( - "https://localhost:4317", - None, - mock_secure, - ), - ( - "https://localhost:4317", - True, - mock_secure, - ), - ] - - # pylint: disable=C0209 - for endpoint, insecure, mock_method in endpoints: - OTLPLogExporter(endpoint=endpoint, insecure=insecure) - self.assertEqual( - 1, - mock_method.call_count, - "expected {} to be called for {} {}".format( - mock_method, endpoint, insecure - ), - ) - self.assertEqual( - expected_endpoint, - mock_method.call_args[0][0], - "expected {} got {} {}".format( - expected_endpoint, mock_method.call_args[0][0], endpoint - ), - ) - mock_method.reset_mock() - - def test_otlp_headers_from_env(self): - # pylint: disable=protected-access - self.assertEqual( - self.exporter._headers, - (("user-agent", "OTel-OTLP-Exporter-Python/" + __version__),), - ) - - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable(self, mock_sleep, mock_expo): - mock_expo.configure_mock(**{"return_value": [0.01]}) - - add_LogsServiceServicer_to_server( - LogsServiceServicerUNAVAILABLE(), self.server - ) - self.assertEqual( - self.exporter.export([self.log_data_1]), LogExportResult.FAILURE - ) - mock_sleep.assert_called_with(0.01) - - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable_delay(self, mock_sleep, mock_expo): - mock_expo.configure_mock(**{"return_value": [1]}) - - add_LogsServiceServicer_to_server( - LogsServiceServicerUNAVAILABLEDelay(), self.server - ) - self.assertEqual( - self.exporter.export([self.log_data_1]), LogExportResult.FAILURE - ) - mock_sleep.assert_called_with(0.01) - - def test_success(self): - add_LogsServiceServicer_to_server( - LogsServiceServicerSUCCESS(), self.server - ) - self.assertEqual( - self.exporter.export([self.log_data_1]), LogExportResult.SUCCESS - ) - - def test_failure(self): - add_LogsServiceServicer_to_server( - LogsServiceServicerALREADY_EXISTS(), self.server - ) - self.assertEqual( - self.exporter.export([self.log_data_1]), LogExportResult.FAILURE - ) - def export_log_and_deserialize(self, log_data): # pylint: disable=protected-access translated_data = self.exporter._translate_data([log_data]) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index d9b02611a07..656d9a6cb79 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -13,10 +13,10 @@ # limitations under the License. import threading +import time +from concurrent.futures import ThreadPoolExecutor from logging import WARNING -from time import time_ns -from types import MethodType -from typing import Sequence +from typing import Any, Optional, Sequence from unittest import TestCase from unittest.mock import Mock, patch @@ -26,20 +26,215 @@ from google.rpc.error_details_pb2 import ( # pylint: disable=no-name-in-module RetryInfo, ) -from grpc import Compression +from grpc import Compression, StatusCode, server -from opentelemetry.exporter.otlp.proto.grpc.exporter import ( - ExportServiceRequestT, +from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( + encode_spans, +) +from opentelemetry.exporter.otlp.proto.grpc.exporter import ( # noqa: F401 InvalidCompressionValueException, OTLPExporterMixin, - RpcError, - SDKDataT, - StatusCode, environ_to_compression, ) +from opentelemetry.exporter.otlp.proto.grpc.version import __version__ +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( + ExportTraceServiceRequest, + ExportTraceServiceResponse, +) +from opentelemetry.proto.collector.trace.v1.trace_service_pb2_grpc import ( + TraceServiceServicer, + TraceServiceStub, + add_TraceServiceServicer_to_server, +) +from opentelemetry.sdk.environment_variables import ( + OTEL_EXPORTER_OTLP_COMPRESSION, +) +from opentelemetry.sdk.trace import ReadableSpan, _Span +from opentelemetry.sdk.trace.export import ( + SpanExporter, + SpanExportResult, +) + + +# The below tests use this test SpanExporter and Spans, but are testing the +# underlying behavior in the mixin. A MetricExporter or LogExporter could +# just as easily be used. +class OTLPSpanExporterForTesting( + SpanExporter, + OTLPExporterMixin[ + ReadableSpan, ExportTraceServiceRequest, SpanExportResult + ], +): + _result = SpanExportResult + _stub = TraceServiceStub + + def _translate_data( + self, data: Sequence[ReadableSpan] + ) -> ExportTraceServiceRequest: + return encode_spans(data) + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + return self._export(spans) + + @property + def _exporting(self): + return "traces" + + def shutdown(self, timeout_millis=30_000): + return OTLPExporterMixin.shutdown(self, timeout_millis) + + +class TraceServiceServicerWithExportParams(TraceServiceServicer): + def __init__( + self, + export_result: StatusCode, + optional_export_sleep: Optional[float] = None, + optional_export_retry_millis: Optional[float] = None, + ): + self.export_result = export_result + self.optional_export_sleep = optional_export_sleep + self.optional_export_retry_millis = optional_export_retry_millis + + # pylint: disable=invalid-name,unused-argument + def Export(self, request, context): + if self.optional_export_sleep: + time.sleep(self.optional_export_sleep) + if self.optional_export_retry_millis: + context.send_initial_metadata( + ( + ( + "google.rpc.retryinfo-bin", + RetryInfo().SerializeToString(), + ), + ) + ) + context.set_trailing_metadata( + ( + ( + "google.rpc.retryinfo-bin", + RetryInfo( + retry_delay=Duration( + nanos=int(self.optional_export_retry_millis) + ) + ).SerializeToString(), + ), + ) + ) + context.set_code(self.export_result) + + return ExportTraceServiceResponse() + + +class ThreadWithReturnValue(threading.Thread): + def __init__( + self, + target=None, + args=(), + ): + super().__init__(target=target, args=args) + self._return = None + + def run(self): + try: + if self._target is not None: # type: ignore + self._return = self._target(*self._args, **self._kwargs) # type: ignore + finally: + # Avoid a refcycle if the thread is running a function with + # an argument that has a member that points to the thread. + del self._target, self._args, self._kwargs # type: ignore + + def join(self, timeout: Optional[float] = None) -> Any: + super().join(timeout=timeout) + return self._return class TestOTLPExporterMixin(TestCase): + def setUp(self): + self.server = server(ThreadPoolExecutor(max_workers=10)) + + self.server.add_insecure_port("127.0.0.1:4317") + + self.server.start() + self.exporter = OTLPSpanExporterForTesting(insecure=True) + self.span = _Span( + "a", + context=Mock( + **{ + "trace_state": {"a": "b", "c": "d"}, + "span_id": 10217189687419569865, + "trace_id": 67545097771067222548457157018666467027, + } + ), + ) + + def tearDown(self): + self.server.stop(None) + + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") + def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): + expected_endpoint = "localhost:4317" + endpoints = [ + ( + "http://localhost:4317", + None, + mock_insecure, + ), + ( + "localhost:4317", + None, + mock_secure, + ), + ( + "http://localhost:4317", + True, + mock_insecure, + ), + ( + "localhost:4317", + True, + mock_insecure, + ), + ( + "http://localhost:4317", + False, + mock_secure, + ), + ( + "localhost:4317", + False, + mock_secure, + ), + ( + "https://localhost:4317", + False, + mock_secure, + ), + ( + "https://localhost:4317", + None, + mock_secure, + ), + ( + "https://localhost:4317", + True, + mock_secure, + ), + ] + for endpoint, insecure, mock_method in endpoints: + OTLPSpanExporterForTesting(endpoint=endpoint, insecure=insecure) + self.assertEqual( + 1, + mock_method.call_count, + f"expected {mock_method} to be called for {endpoint} {insecure}", + ) + self.assertEqual( + expected_endpoint, + mock_method.call_args[0][0], + f"expected {expected_endpoint} got {mock_method.call_args[0][0]} {endpoint}", + ) + mock_method.reset_mock() + def test_environ_to_compression(self): with patch.dict( "os.environ", @@ -64,147 +259,187 @@ def test_environ_to_compression(self): with self.assertRaises(InvalidCompressionValueException): environ_to_compression("test_invalid") + # pylint: disable=no-self-use + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") + @patch.dict("os.environ", {}) + def test_otlp_exporter_otlp_compression_unspecified( + self, mock_insecure_channel + ): + """No env or kwarg should be NoCompression""" + OTLPSpanExporterForTesting(insecure=True) + mock_insecure_channel.assert_called_once_with( + "localhost:4317", compression=Compression.NoCompression + ) + + # pylint: disable=no-self-use, disable=unused-argument @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" + "opentelemetry.exporter.otlp.proto.grpc.exporter.ssl_channel_credentials" ) - def test_export_warning(self, mock_expo): - mock_expo.configure_mock(**{"return_value": [0]}) + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") + @patch.dict("os.environ", {}) + def test_no_credentials_ssl_channel_called( + self, secure_channel, mock_ssl_channel + ): + OTLPSpanExporterForTesting(insecure=False) + self.assertTrue(mock_ssl_channel.called) + + # pylint: disable=no-self-use + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") + @patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"}) + def test_otlp_exporter_otlp_compression_envvar( + self, mock_insecure_channel + ): + """Just OTEL_EXPORTER_OTLP_COMPRESSION should work""" + OTLPSpanExporterForTesting(insecure=True) + mock_insecure_channel.assert_called_once_with( + "localhost:4317", compression=Compression.Gzip + ) - rpc_error = RpcError() + def test_shutdown(self): + add_TraceServiceServicer_to_server( + TraceServiceServicerWithExportParams(StatusCode.OK), + self.server, + ) + self.assertEqual( + self.exporter.export([self.span]), SpanExportResult.SUCCESS + ) + self.exporter.shutdown() + with self.assertLogs(level=WARNING) as warning: + self.assertEqual( + self.exporter.export([self.span]), SpanExportResult.FAILURE + ) + self.assertEqual( + warning.records[0].message, + "Exporter already shutdown, ignoring batch", + ) - def code(self): - return None + def test_shutdown_wait_last_export(self): + add_TraceServiceServicer_to_server( + TraceServiceServicerWithExportParams( + StatusCode.OK, optional_export_sleep=1 + ), + self.server, + ) - rpc_error.code = MethodType(code, rpc_error) + export_thread = ThreadWithReturnValue( + target=self.exporter.export, args=([self.span],) + ) + export_thread.start() + # Wait a bit for exporter to get lock and make export call. + time.sleep(0.25) + # pylint: disable=protected-access + self.assertTrue(self.exporter._export_lock.locked()) + self.exporter.shutdown(timeout_millis=3000) + # pylint: disable=protected-access + self.assertTrue(self.exporter._shutdown) + self.assertEqual(export_thread.join(), SpanExportResult.SUCCESS) + + def test_shutdown_doesnot_wait_last_export(self): + add_TraceServiceServicer_to_server( + TraceServiceServicerWithExportParams( + StatusCode.OK, optional_export_sleep=3 + ), + self.server, + ) - class OTLPMockExporter(OTLPExporterMixin): - _result = Mock() - _stub = Mock( - **{"return_value": Mock(**{"Export.side_effect": rpc_error})} - ) + export_thread = ThreadWithReturnValue( + target=self.exporter.export, args=([self.span],) + ) + export_thread.start() + # Wait for exporter to get lock and make export call. + time.sleep(0.25) + # pylint: disable=protected-access + self.assertTrue(self.exporter._export_lock.locked()) + # Set to 1 seconds, so the 3 second server-side delay will not be reached. + self.exporter.shutdown(timeout_millis=1000) + # pylint: disable=protected-access + self.assertTrue(self.exporter._shutdown) + self.assertEqual(export_thread.join(), None) - def _translate_data( - self, data: Sequence[SDKDataT] - ) -> ExportServiceRequestT: - pass + def test_export_over_closed_grpc_channel(self): + # pylint: disable=protected-access - @property - def _exporting(self) -> str: - return "mock" + add_TraceServiceServicer_to_server( + TraceServiceServicerWithExportParams(StatusCode.OK), + self.server, + ) + self.exporter.export([self.span]) + self.exporter.shutdown() + data = self.exporter._translate_data([self.span]) + with self.assertRaises(ValueError) as err: + self.exporter._client.Export(request=data) + self.assertEqual( + str(err.exception), "Cannot invoke RPC on closed channel!" + ) - otlp_mock_exporter = OTLPMockExporter() + @patch( + "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" + ) + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") + def test_unavailable(self, mock_sleep, mock_expo): + mock_expo.configure_mock(**{"return_value": [0.01]}) + add_TraceServiceServicer_to_server( + TraceServiceServicerWithExportParams(StatusCode.UNAVAILABLE), + self.server, + ) + result = self.exporter.export([self.span]) + self.assertEqual(result, SpanExportResult.FAILURE) + mock_sleep.assert_called_with(0.01) + + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") + def test_unavailable_delay(self, mock_sleep): + add_TraceServiceServicer_to_server( + TraceServiceServicerWithExportParams( + StatusCode.UNAVAILABLE, + optional_export_sleep=None, + optional_export_retry_millis=1e7, + ), + self.server, + ) with self.assertLogs(level=WARNING) as warning: - # pylint: disable=protected-access - otlp_mock_exporter._export(Mock()) self.assertEqual( - warning.records[0].message, - "Failed to export mock to localhost:4317, error code: None", + self.exporter.export([self.span]), SpanExportResult.FAILURE ) + mock_sleep.assert_called_with(0.01) - def code(self): # pylint: disable=function-redefined - return StatusCode.CANCELLED - - def trailing_metadata(self): - return {} - - rpc_error.code = MethodType(code, rpc_error) - rpc_error.trailing_metadata = MethodType(trailing_metadata, rpc_error) - - with self.assertLogs(level=WARNING) as warning: - # pylint: disable=protected-access - otlp_mock_exporter._export([]) self.assertEqual( warning.records[0].message, ( - "Transient error StatusCode.CANCELLED encountered " - "while exporting mock to localhost:4317, retrying in 0s." + "Transient error StatusCode.UNAVAILABLE encountered " + "while exporting traces to localhost:4317, retrying in 0.01s." ), ) - def test_shutdown(self): - result_mock = Mock() - - class OTLPMockExporter(OTLPExporterMixin): - _result = result_mock - _stub = Mock(**{"return_value": Mock()}) - - def _translate_data( - self, data: Sequence[SDKDataT] - ) -> ExportServiceRequestT: - pass - - @property - def _exporting(self) -> str: - return "mock" + def test_success(self): + add_TraceServiceServicer_to_server( + TraceServiceServicerWithExportParams(StatusCode.OK), + self.server, + ) + self.assertEqual( + self.exporter.export([self.span]), SpanExportResult.SUCCESS + ) - otlp_mock_exporter = OTLPMockExporter() + def test_otlp_headers_from_env(self): + # pylint: disable=protected-access + # This ensures that there is no other header than standard user-agent. + self.assertEqual( + self.exporter._headers, + (("user-agent", "OTel-OTLP-Exporter-Python/" + __version__),), + ) + def test_permanent_failure(self): with self.assertLogs(level=WARNING) as warning: - # pylint: disable=protected-access - self.assertEqual( - otlp_mock_exporter._export(data={}), result_mock.SUCCESS + add_TraceServiceServicer_to_server( + TraceServiceServicerWithExportParams( + StatusCode.ALREADY_EXISTS + ), + self.server, ) - otlp_mock_exporter.shutdown() - # pylint: disable=protected-access self.assertEqual( - otlp_mock_exporter._export(data={}), result_mock.FAILURE + self.exporter.export([self.span]), SpanExportResult.FAILURE ) self.assertEqual( warning.records[0].message, - "Exporter already shutdown, ignoring batch", + "Failed to export traces to localhost:4317, error code: StatusCode.ALREADY_EXISTS", ) - - def test_shutdown_wait_last_export(self): - result_mock = Mock() - rpc_error = RpcError() - - def code(self): - return StatusCode.UNAVAILABLE - - def trailing_metadata(self): - return { - "google.rpc.retryinfo-bin": RetryInfo( - retry_delay=Duration(nanos=int(1e7)) - ).SerializeToString() - } - - rpc_error.code = MethodType(code, rpc_error) - rpc_error.trailing_metadata = MethodType(trailing_metadata, rpc_error) - - class OTLPMockExporter(OTLPExporterMixin): - _result = result_mock - _stub = Mock( - **{"return_value": Mock(**{"Export.side_effect": rpc_error})} - ) - - def _translate_data( - self, data: Sequence[SDKDataT] - ) -> ExportServiceRequestT: - pass - - @property - def _exporting(self) -> str: - return "mock" - - otlp_mock_exporter = OTLPMockExporter() - - # pylint: disable=protected-access - export_thread = threading.Thread( - target=otlp_mock_exporter._export, args=({},) - ) - export_thread.start() - try: - # pylint: disable=protected-access - self.assertTrue(otlp_mock_exporter._export_lock.locked()) - # delay is 1 second while the default shutdown timeout is 30_000 milliseconds - start_time = time_ns() - otlp_mock_exporter.shutdown() - now = time_ns() - self.assertGreaterEqual(now, (start_time + 30 / 1000)) - # pylint: disable=protected-access - self.assertTrue(otlp_mock_exporter._shutdown) - # pylint: disable=protected-access - self.assertFalse(otlp_mock_exporter._export_lock.locked()) - finally: - export_thread.join() diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index 9cd7ac38358..2ea12f660fb 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -12,39 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -# pylint: disable=too-many-lines - -import threading -from concurrent.futures import ThreadPoolExecutor - # pylint: disable=too-many-lines from logging import WARNING from os import environ from os.path import dirname -from time import time_ns from typing import List from unittest import TestCase from unittest.mock import patch -from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module - Duration, -) -from google.rpc.error_details_pb2 import ( # pylint: disable=no-name-in-module - RetryInfo, -) -from grpc import ChannelCredentials, Compression, StatusCode, server +from grpc import ChannelCredentials, Compression from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( OTLPMetricExporter, ) from opentelemetry.exporter.otlp.proto.grpc.version import __version__ -from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( - ExportMetricsServiceResponse, -) -from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2_grpc import ( - MetricsServiceServicer, - add_MetricsServiceServicer_to_server, -) from opentelemetry.proto.common.v1.common_pb2 import InstrumentationScope from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_COMPRESSION, @@ -71,7 +52,6 @@ AggregationTemporality, Gauge, Metric, - MetricExportResult, MetricsData, NumberDataPoint, ResourceMetrics, @@ -90,72 +70,12 @@ THIS_DIR = dirname(__file__) -class MetricsServiceServicerUNAVAILABLEDelay(MetricsServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.UNAVAILABLE) - - context.send_initial_metadata( - (("google.rpc.retryinfo-bin", RetryInfo().SerializeToString()),) - ) - context.set_trailing_metadata( - ( - ( - "google.rpc.retryinfo-bin", - RetryInfo( - retry_delay=Duration(nanos=int(1e7)) - ).SerializeToString(), - ), - ) - ) - - return ExportMetricsServiceResponse() - - -class MetricsServiceServicerUNAVAILABLE(MetricsServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.UNAVAILABLE) - - return ExportMetricsServiceResponse() - - -class MetricsServiceServicerUNKNOWN(MetricsServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.UNKNOWN) - - return ExportMetricsServiceResponse() - - -class MetricsServiceServicerSUCCESS(MetricsServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.OK) - - return ExportMetricsServiceResponse() - - -class MetricsServiceServicerALREADY_EXISTS(MetricsServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.ALREADY_EXISTS) - - return ExportMetricsServiceResponse() - - class TestOTLPMetricExporter(TestCase): # pylint: disable=too-many-public-methods def setUp(self): self.exporter = OTLPMetricExporter() - self.server = server(ThreadPoolExecutor(max_workers=10)) - - self.server.add_insecure_port("127.0.0.1:4317") - - self.server.start() - self.metrics = { "sum_int": MetricsData( resource_metrics=[ @@ -181,9 +101,6 @@ def setUp(self): ) } - def tearDown(self): - self.server.stop(None) - def test_exporting(self): # pylint: disable=protected-access self.assertEqual(self.exporter._exporting, "metrics") @@ -371,92 +288,6 @@ def test_otlp_insecure_from_env(self, mock_insecure): f"expected {mock_insecure} to be called", ) - # pylint: disable=no-self-use - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") - def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): - expected_endpoint = "localhost:4317" - endpoints = [ - ( - "http://localhost:4317", - None, - mock_insecure, - ), - ( - "localhost:4317", - None, - mock_secure, - ), - ( - "http://localhost:4317", - True, - mock_insecure, - ), - ( - "localhost:4317", - True, - mock_insecure, - ), - ( - "http://localhost:4317", - False, - mock_secure, - ), - ( - "localhost:4317", - False, - mock_secure, - ), - ( - "https://localhost:4317", - False, - mock_secure, - ), - ( - "https://localhost:4317", - None, - mock_secure, - ), - ( - "https://localhost:4317", - True, - mock_secure, - ), - ] - # pylint: disable=C0209 - for endpoint, insecure, mock_method in endpoints: - OTLPMetricExporter(endpoint=endpoint, insecure=insecure) - self.assertEqual( - 1, - mock_method.call_count, - "expected {} to be called for {} {}".format( - mock_method, endpoint, insecure - ), - ) - self.assertEqual( - expected_endpoint, - mock_method.call_args[0][0], - "expected {} got {} {}".format( - expected_endpoint, mock_method.call_args[0][0], endpoint - ), - ) - mock_method.reset_mock() - - # pylint: disable=no-self-use - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") - @patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"}) - def test_otlp_exporter_otlp_compression_envvar( - self, mock_insecure_channel, mock_expo - ): - """Just OTEL_EXPORTER_OTLP_COMPRESSION should work""" - OTLPMetricExporter(insecure=True) - mock_insecure_channel.assert_called_once_with( - "localhost:4317", compression=Compression.Gzip - ) - # pylint: disable=no-self-use @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") @patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"}) @@ -469,92 +300,6 @@ def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel): "localhost:4317", compression=Compression.NoCompression ) - # pylint: disable=no-self-use - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") - @patch.dict("os.environ", {}) - def test_otlp_exporter_otlp_compression_unspecified( - self, mock_insecure_channel - ): - """No env or kwarg should be NoCompression""" - OTLPMetricExporter(insecure=True) - mock_insecure_channel.assert_called_once_with( - "localhost:4317", compression=Compression.NoCompression - ) - - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable(self, mock_sleep, mock_expo): - mock_expo.configure_mock(**{"return_value": [0.01]}) - - add_MetricsServiceServicer_to_server( - MetricsServiceServicerUNAVAILABLE(), self.server - ) - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.FAILURE, - ) - mock_sleep.assert_called_with(0.01) - - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable_delay(self, mock_sleep, mock_expo): - mock_expo.configure_mock(**{"return_value": [1]}) - - add_MetricsServiceServicer_to_server( - MetricsServiceServicerUNAVAILABLEDelay(), self.server - ) - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.FAILURE, - ) - mock_sleep.assert_called_with(0.01) - - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.logger.error") - def test_unknown_logs(self, mock_logger_error, mock_sleep, mock_expo): - mock_expo.configure_mock(**{"return_value": [1]}) - - add_MetricsServiceServicer_to_server( - MetricsServiceServicerUNKNOWN(), self.server - ) - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.FAILURE, - ) - mock_sleep.assert_not_called() - mock_logger_error.assert_called_with( - "Failed to export %s to %s, error code: %s", - "metrics", - "localhost:4317", - StatusCode.UNKNOWN, - exc_info=True, - ) - - def test_success(self): - add_MetricsServiceServicer_to_server( - MetricsServiceServicerSUCCESS(), self.server - ) - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.SUCCESS, - ) - - def test_failure(self): - add_MetricsServiceServicer_to_server( - MetricsServiceServicerALREADY_EXISTS(), self.server - ) - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.FAILURE, - ) - def test_split_metrics_data_many_data_points(self): # GIVEN metrics_data = MetricsData( @@ -830,65 +575,6 @@ def test_insecure_https_endpoint(self, mock_secure_channel): OTLPMetricExporter(endpoint="https://ab.c:123", insecure=True) mock_secure_channel.assert_called() - def test_shutdown(self): - add_MetricsServiceServicer_to_server( - MetricsServiceServicerSUCCESS(), self.server - ) - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.SUCCESS, - ) - self.exporter.shutdown() - with self.assertLogs(level=WARNING) as warning: - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.FAILURE, - ) - self.assertEqual( - warning.records[0].message, - "Exporter already shutdown, ignoring batch", - ) - self.exporter = OTLPMetricExporter() - - def test_shutdown_wait_last_export(self): - add_MetricsServiceServicer_to_server( - MetricsServiceServicerUNAVAILABLEDelay(), self.server - ) - - export_thread = threading.Thread( - target=self.exporter.export, args=(self.metrics["sum_int"],) - ) - export_thread.start() - try: - # pylint: disable=protected-access - self.assertTrue(self.exporter._export_lock.locked()) - # delay is 4 seconds while the default shutdown timeout is 30_000 milliseconds - start_time = time_ns() - self.exporter.shutdown() - now = time_ns() - self.assertGreaterEqual(now, (start_time + 30 / 1000)) - # pylint: disable=protected-access - self.assertTrue(self.exporter._shutdown) - # pylint: disable=protected-access - self.assertFalse(self.exporter._export_lock.locked()) - finally: - export_thread.join() - - def test_export_over_closed_grpc_channel(self): - # pylint: disable=protected-access - - add_MetricsServiceServicer_to_server( - MetricsServiceServicerSUCCESS(), self.server - ) - self.exporter.export(self.metrics["sum_int"]) - self.exporter.shutdown() - data = self.exporter._translate_data(self.metrics["sum_int"]) - with self.assertRaises(ValueError) as err: - self.exporter._client.Export(request=data) - self.assertEqual( - str(err.exception), "Cannot invoke RPC on closed channel!" - ) - def test_aggregation_temporality(self): # pylint: disable=protected-access diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index f29b7fc611c..73d8d6c7a20 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -15,20 +15,10 @@ # pylint: disable=too-many-lines import os -import threading -from concurrent.futures import ThreadPoolExecutor -from logging import WARNING -from time import time_ns from unittest import TestCase from unittest.mock import Mock, PropertyMock, patch -from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module - Duration, -) -from google.rpc.error_details_pb2 import ( # pylint: disable=no-name-in-module - RetryInfo, -) -from grpc import ChannelCredentials, Compression, StatusCode, server +from grpc import ChannelCredentials, Compression from opentelemetry.attributes import BoundedAttributes from opentelemetry.exporter.otlp.proto.common._internal import ( @@ -40,11 +30,6 @@ from opentelemetry.exporter.otlp.proto.grpc.version import __version__ from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( ExportTraceServiceRequest, - ExportTraceServiceResponse, -) -from opentelemetry.proto.collector.trace.v1.trace_service_pb2_grpc import ( - TraceServiceServicer, - add_TraceServiceServicer_to_server, ) from opentelemetry.proto.common.v1.common_pb2 import ( AnyValue, @@ -80,7 +65,6 @@ from opentelemetry.sdk.trace import TracerProvider, _Span from opentelemetry.sdk.trace.export import ( SimpleSpanProcessor, - SpanExportResult, ) from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.test.spantestutil import ( @@ -90,52 +74,6 @@ THIS_DIR = os.path.dirname(__file__) -class TraceServiceServicerUNAVAILABLEDelay(TraceServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.UNAVAILABLE) - - context.send_initial_metadata( - (("google.rpc.retryinfo-bin", RetryInfo().SerializeToString()),) - ) - context.set_trailing_metadata( - ( - ( - "google.rpc.retryinfo-bin", - RetryInfo( - retry_delay=Duration(nanos=int(1e7)) - ).SerializeToString(), - ), - ) - ) - - return ExportTraceServiceResponse() - - -class TraceServiceServicerUNAVAILABLE(TraceServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.UNAVAILABLE) - - return ExportTraceServiceResponse() - - -class TraceServiceServicerSUCCESS(TraceServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.OK) - - return ExportTraceServiceResponse() - - -class TraceServiceServicerALREADY_EXISTS(TraceServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.ALREADY_EXISTS) - - return ExportTraceServiceResponse() - - class TestOTLPSpanExporter(TestCase): # pylint: disable=too-many-public-methods @@ -145,12 +83,6 @@ def setUp(self): tracer_provider.add_span_processor(SimpleSpanProcessor(self.exporter)) self.tracer = tracer_provider.get_tracer(__name__) - self.server = server(ThreadPoolExecutor(max_workers=10)) - - self.server.add_insecure_port("127.0.0.1:4317") - - self.server.start() - event_mock = Mock( **{ "timestamp": 1591240820506462784, @@ -232,9 +164,6 @@ def setUp(self): self.span3.start() self.span3.end() - def tearDown(self): - self.server.stop(None) - def test_exporting(self): # pylint: disable=protected-access self.assertEqual(self.exporter._exporting, "traces") @@ -397,85 +326,6 @@ def test_otlp_insecure_from_env(self, mock_insecure): f"expected {mock_insecure} to be called", ) - # pylint: disable=no-self-use - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") - def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): - """Just OTEL_EXPORTER_OTLP_COMPRESSION should work""" - expected_endpoint = "localhost:4317" - endpoints = [ - ( - "http://localhost:4317", - None, - mock_insecure, - ), - ( - "localhost:4317", - None, - mock_secure, - ), - ( - "http://localhost:4317", - True, - mock_insecure, - ), - ( - "localhost:4317", - True, - mock_insecure, - ), - ( - "http://localhost:4317", - False, - mock_secure, - ), - ( - "localhost:4317", - False, - mock_secure, - ), - ( - "https://localhost:4317", - False, - mock_secure, - ), - ( - "https://localhost:4317", - None, - mock_secure, - ), - ( - "https://localhost:4317", - True, - mock_secure, - ), - ] - for endpoint, insecure, mock_method in endpoints: - OTLPSpanExporter(endpoint=endpoint, insecure=insecure) - self.assertEqual( - 1, - mock_method.call_count, - f"expected {mock_method} to be called for {endpoint} {insecure}", - ) - self.assertEqual( - expected_endpoint, - mock_method.call_args[0][0], - f"expected {expected_endpoint} got {mock_method.call_args[0][0]} {endpoint}", - ) - mock_method.reset_mock() - - # pylint: disable=no-self-use - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") - @patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"}) - def test_otlp_exporter_otlp_compression_envvar( - self, mock_insecure_channel - ): - """Just OTEL_EXPORTER_OTLP_COMPRESSION should work""" - OTLPSpanExporter(insecure=True) - mock_insecure_channel.assert_called_once_with( - "localhost:4317", compression=Compression.Gzip - ) - # pylint: disable=no-self-use @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") @patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"}) @@ -486,18 +336,6 @@ def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel): "localhost:4317", compression=Compression.NoCompression ) - # pylint: disable=no-self-use - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") - @patch.dict("os.environ", {}) - def test_otlp_exporter_otlp_compression_unspecified( - self, mock_insecure_channel - ): - """No env or kwarg should be NoCompression""" - OTLPSpanExporter(insecure=True) - mock_insecure_channel.assert_called_once_with( - "localhost:4317", compression=Compression.NoCompression - ) - # pylint: disable=no-self-use @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") @patch.dict( @@ -515,65 +353,6 @@ def test_otlp_exporter_otlp_compression_precendence( "localhost:4317", compression=Compression.Gzip ) - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter.ssl_channel_credentials" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") - # pylint: disable=unused-argument - def test_otlp_headers(self, mock_ssl_channel, mock_secure): - exporter = OTLPSpanExporter() - # pylint: disable=protected-access - # This ensures that there is no other header than standard user-agent. - self.assertEqual( - exporter._headers, - (("user-agent", "OTel-OTLP-Exporter-Python/" + __version__),), - ) - - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable(self, mock_sleep, mock_expo): - mock_expo.configure_mock(**{"return_value": [0.01]}) - - add_TraceServiceServicer_to_server( - TraceServiceServicerUNAVAILABLE(), self.server - ) - result = self.exporter.export([self.span]) - self.assertEqual(result, SpanExportResult.FAILURE) - mock_sleep.assert_called_with(0.01) - - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable_delay(self, mock_sleep, mock_expo): - mock_expo.configure_mock(**{"return_value": [1]}) - - add_TraceServiceServicer_to_server( - TraceServiceServicerUNAVAILABLEDelay(), self.server - ) - self.assertEqual( - self.exporter.export([self.span]), SpanExportResult.FAILURE - ) - mock_sleep.assert_called_with(0.01) - - def test_success(self): - add_TraceServiceServicer_to_server( - TraceServiceServicerSUCCESS(), self.server - ) - self.assertEqual( - self.exporter.export([self.span]), SpanExportResult.SUCCESS - ) - - def test_failure(self): - add_TraceServiceServicer_to_server( - TraceServiceServicerALREADY_EXISTS(), self.server - ) - self.assertEqual( - self.exporter.export([self.span]), SpanExportResult.FAILURE - ) - def test_translate_spans(self): expected = ExportTraceServiceRequest( resource_spans=[ @@ -976,62 +755,6 @@ def test_dropped_values(self): .dropped_attributes_count, ) - def test_shutdown(self): - add_TraceServiceServicer_to_server( - TraceServiceServicerSUCCESS(), self.server - ) - self.assertEqual( - self.exporter.export([self.span]), SpanExportResult.SUCCESS - ) - self.exporter.shutdown() - with self.assertLogs(level=WARNING) as warning: - self.assertEqual( - self.exporter.export([self.span]), SpanExportResult.FAILURE - ) - self.assertEqual( - warning.records[0].message, - "Exporter already shutdown, ignoring batch", - ) - - def test_shutdown_wait_last_export(self): - add_TraceServiceServicer_to_server( - TraceServiceServicerUNAVAILABLEDelay(), self.server - ) - - export_thread = threading.Thread( - target=self.exporter.export, args=([self.span],) - ) - export_thread.start() - try: - # pylint: disable=protected-access - self.assertTrue(self.exporter._export_lock.locked()) - # delay is 4 seconds while the default shutdown timeout is 30_000 milliseconds - start_time = time_ns() - self.exporter.shutdown() - now = time_ns() - self.assertGreaterEqual(now, (start_time + 30 / 1000)) - # pylint: disable=protected-access - self.assertTrue(self.exporter._shutdown) - # pylint: disable=protected-access - self.assertFalse(self.exporter._export_lock.locked()) - finally: - export_thread.join() - - def test_export_over_closed_grpc_channel(self): - # pylint: disable=protected-access - - add_TraceServiceServicer_to_server( - TraceServiceServicerSUCCESS(), self.server - ) - self.exporter.export([self.span]) - self.exporter.shutdown() - data = self.exporter._translate_data([self.span]) - with self.assertRaises(ValueError) as err: - self.exporter._client.Export(request=data) - self.assertEqual( - str(err.exception), "Cannot invoke RPC on closed channel!" - ) - def _create_span_with_status(status: SDKStatus): span = _Span(