Skip to content

Commit 588a94e

Browse files
committedApr 25, 2025·
Another commit

File tree

3 files changed

+49
-60
lines changed

3 files changed

+49
-60
lines changed
 

‎opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

+24-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16+
import abc
1617
import enum
1718
import logging
1819
import sys
@@ -30,7 +31,7 @@
3031
set_value,
3132
)
3233
from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor
33-
from opentelemetry.sdk._shared_internal import BatchProcessor, LogExporter
34+
from opentelemetry.sdk._shared_internal import BatchProcessor
3435
from opentelemetry.sdk.environment_variables import (
3536
OTEL_BLRP_EXPORT_TIMEOUT,
3637
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
@@ -54,7 +55,29 @@ class LogExportResult(enum.Enum):
5455

5556

5657

58+
class LogExporter(abc.ABC):
59+
"""Interface for exporting logs.
60+
Interface to be implemented by services that want to export logs received
61+
in their own format.
62+
To export data this MUST be registered to the :class`opentelemetry.sdk._logs.Logger` using a
63+
log processor.
64+
"""
65+
66+
@abc.abstractmethod
67+
def export(self, batch: Sequence[LogData]):
68+
"""Exports a batch of logs.
69+
Args:
70+
batch: The list of `LogData` objects to be exported
71+
Returns:
72+
The result of the export
73+
"""
74+
75+
@abc.abstractmethod
76+
def shutdown(self):
77+
"""Shuts down the exporter.
5778
79+
Called when the SDK is shut down.
80+
"""
5881

5982
class ConsoleLogExporter(LogExporter):
6083
"""Implementation of :class:`LogExporter` that prints log records to the

‎opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py

+12-45
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,19 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import abc
15+
from __future__ import annotations
16+
1617
import collections
1718
import enum
1819
import logging
1920
import os
2021
import threading
2122
from abc import ABC
2223
from typing import (
24+
TYPE_CHECKING,
2325
Deque,
2426
Optional,
2527
Union,
26-
Sequence
2728
)
2829

2930
from opentelemetry.context import (
@@ -32,48 +33,14 @@
3233
detach,
3334
set_value,
3435
)
35-
from opentelemetry.sdk._logs import LogRecord
36-
from opentelemetry.sdk._logs.export import LogExporter
37-
from opentelemetry.sdk.trace import Span
38-
from opentelemetry.sdk.trace.export import SpanExporter
3936
from opentelemetry.util._once import Once
4037

41-
from opentelemetry.context import (
42-
_SUPPRESS_INSTRUMENTATION_KEY,
43-
attach,
44-
detach,
45-
set_value,
46-
)
47-
from opentelemetry.sdk._logs import LogData,
48-
49-
50-
class LogExporter(abc.ABC):
51-
"""Interface for exporting logs.
38+
if TYPE_CHECKING:
39+
from opentelemetry.sdk._logs import LogRecord
40+
from opentelemetry.sdk._logs.export import LogExporter
41+
from opentelemetry.sdk.trace import Span
42+
from opentelemetry.sdk.trace.export import SpanExporter
5243

53-
Interface to be implemented by services that want to export logs received
54-
in their own format.
55-
56-
To export data this MUST be registered to the :class`opentelemetry.sdk._logs.Logger` using a
57-
log processor.
58-
"""
59-
60-
@abc.abstractmethod
61-
def export(self, batch: Sequence[LogData]):
62-
"""Exports a batch of logs.
63-
64-
Args:
65-
batch: The list of `LogData` objects to be exported
66-
67-
Returns:
68-
The result of the export
69-
"""
70-
71-
@abc.abstractmethod
72-
def shutdown(self):
73-
"""Shuts down the exporter.
74-
75-
Called when the SDK is shut down.
76-
"""
7744

7845
class BatchExportStrategy(enum.Enum):
7946
EXPORT_ALL = 0
@@ -82,11 +49,11 @@ class BatchExportStrategy(enum.Enum):
8249

8350

8451
class BatchProcessor(ABC):
85-
_queue: Deque[Union[LogRecord | Span]]
52+
_queue: Deque[Union["LogRecord" | "Span"]]
8653

8754
def __init__(
8855
self,
89-
exporter: Union[LogExporter | SpanExporter],
56+
exporter: Union["LogExporter" | "SpanExporter"],
9057
schedule_delay_millis: float,
9158
max_export_batch_size: int,
9259
export_timeout_millis: float,
@@ -188,7 +155,7 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
188155
)
189156
detach(token)
190157

191-
def emit(self, data: Union[LogRecord | Span]) -> None:
158+
def emit(self, data: Union["LogRecord" | "Span"]) -> None:
192159
if self._shutdown:
193160
self._logger.info(
194161
"Shutdown called, ignoring {}.".format(self._exporting)
@@ -220,4 +187,4 @@ def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
220187
if self._shutdown:
221188
return
222189
# Blocking call to export.
223-
self._export(BatchExportStrategy.EXPORT_ALL)
190+
self._export(BatchExportStrategy.EXPORT_ALL)

‎opentelemetry-sdk/tests/shared_internal/test_batch_processor.py

+13-14
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
LogRecord,
2626
)
2727
from opentelemetry.sdk._logs.export import (
28-
BatchLogRecordProcessor,
2928
InMemoryLogExporter,
3029
)
3130
from opentelemetry.sdk._shared_internal import BatchProcessor
@@ -46,7 +45,7 @@ def test_logs_exported_once_batch_size_reached(self):
4645
max_export_batch_size=15,
4746
# Will not reach this during the test, this sleep should be interrupted when batch size is reached.
4847
schedule_delay_millis=30000,
49-
exporting="Logs",
48+
exporting="Log",
5049
export_timeout_millis=500,
5150
)
5251
before_export = time.time_ns()
@@ -62,12 +61,12 @@ def test_logs_exported_once_batch_size_reached(self):
6261
# pylint: disable=no-self-use
6362
def test_logs_exported_once_schedule_delay_reached(self):
6463
exporter = Mock()
65-
log_record_processor = BatchLogRecordProcessor(
64+
log_record_processor = BatchProcessor(
6665
exporter=exporter,
6766
max_queue_size=15,
6867
max_export_batch_size=15,
6968
schedule_delay_millis=100,
70-
exporting="Logs",
69+
exporting="Log",
7170
export_timeout_millis=500,
7271
)
7372
log_record_processor.emit(EMPTY_LOG)
@@ -76,13 +75,13 @@ def test_logs_exported_once_schedule_delay_reached(self):
7675

7776
def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self):
7877
exporter = Mock()
79-
log_record_processor = BatchLogRecordProcessor(
78+
log_record_processor = BatchProcessor(
8079
exporter=exporter,
8180
# Neither of these thresholds should be hit before test ends.
8281
max_queue_size=15,
8382
max_export_batch_size=15,
8483
schedule_delay_millis=30000,
85-
exporting="Logs",
84+
exporting="Log",
8685
export_timeout_millis=500,
8786
)
8887
# This log should be flushed because it was written before shutdown.
@@ -102,13 +101,13 @@ def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self):
102101
# pylint: disable=no-self-use
103102
def test_force_flush_flushes_logs(self):
104103
exporter = Mock()
105-
log_record_processor = BatchLogRecordProcessor(
104+
log_record_processor = BatchProcessor(
106105
exporter=exporter,
107106
# Neither of these thresholds should be hit before test ends.
108107
max_queue_size=15,
109108
max_export_batch_size=15,
110109
schedule_delay_millis=30000,
111-
exporting="Logs",
110+
exporting="Log",
112111
export_timeout_millis=500,
113112
)
114113
for _ in range(10):
@@ -118,12 +117,12 @@ def test_force_flush_flushes_logs(self):
118117

119118
def test_with_multiple_threads(self):
120119
exporter = InMemoryLogExporter()
121-
log_record_processor = BatchLogRecordProcessor(
120+
log_record_processor = BatchProcessor(
122121
exporter=exporter,
123-
max_queue_size=15,
124-
max_export_batch_size=15,
122+
max_queue_size=3000,
123+
max_export_batch_size=1000,
125124
schedule_delay_millis=30000,
126-
exporting="Logs",
125+
exporting="Log",
127126
export_timeout_millis=500,
128127
)
129128

@@ -147,12 +146,12 @@ def bulk_log_and_flush(num_logs):
147146
)
148147
def test_batch_log_record_processor_fork(self):
149148
exporter = InMemoryLogExporter()
150-
log_record_processor = BatchLogRecordProcessor(
149+
log_record_processor = BatchProcessor(
151150
exporter,
152151
max_queue_size=100,
153152
max_export_batch_size=64,
154153
schedule_delay_millis=30000,
155-
exporting="Logs",
154+
exporting="Log",
156155
export_timeout_millis=500,
157156
)
158157
# These logs should be flushed only from the parent process.

0 commit comments

Comments
 (0)
Please sign in to comment.