Skip to content

Commit 3497e66

Browse files
DylanRusselllzchenemdnetoaabmass
authored
Update BatchSpanProcessor to use new BatchProcessor class (#4580)
* Update `BatchSpanProcessor` to use new `BatchProcessor` class * Update changelog * fork does not exist on windows. * Update force_flush to return a bool. Currently force_flush ignores it's timeout which is bad, but the behavior before made even less sense.. * Fix changelog * Add backtic's around BatchProcessor * Require export get called by position only * Add comment that there are additional tests for the BatchSpan/LogProcessor in the shared_internal directory. * Empty commit to bump * Fix broken test --------- Co-authored-by: Leighton Chen <[email protected]> Co-authored-by: Emídio Neto <[email protected]> Co-authored-by: Aaron Abbott <[email protected]>
1 parent 4145870 commit 3497e66

File tree

7 files changed

+96
-581
lines changed

7 files changed

+96
-581
lines changed

CHANGELOG.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
- typecheck: add sdk/resources and drop mypy
1111
([#4578](https://github.com/open-telemetry/opentelemetry-python/pull/4578))
12-
- Refactor `BatchLogRecordProcessor` to simplify code and make the control flow more
13-
clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/)
14-
and [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535)).
12+
- Refactor `BatchLogRecordProcessor` and `BatchSpanProcessor` to simplify code
13+
and make the control flow more clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/)
14+
[#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535), and
15+
[#4580](https://github.com/open-telemetry/opentelemetry-python/pull/4580)).
1516
- Remove log messages from `BatchLogRecordProcessor.emit`, this caused the program
1617
to crash at shutdown with a max recursion error ([#4586](https://github.com/open-telemetry/opentelemetry-python/pull/4586)).
1718
- Configurable max retry timeout for grpc exporter

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ def emit(self, log_data: LogData) -> None:
192192
def shutdown(self):
193193
return self._batch_processor.shutdown()
194194

195-
def force_flush(self, timeout_millis: Optional[int] = None):
195+
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
196196
return self._batch_processor.force_flush(timeout_millis)
197197

198198
@staticmethod

opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class BatchExportStrategy(enum.Enum):
4848

4949
class Exporter(Protocol[Telemetry]):
5050
@abstractmethod
51-
def export(self, batch: list[Telemetry]):
51+
def export(self, batch: list[Telemetry], /):
5252
raise NotImplementedError
5353

5454
@abstractmethod
@@ -191,8 +191,10 @@ def shutdown(self):
191191
self._worker_thread.join()
192192
self._exporter.shutdown()
193193

194-
def force_flush(self, timeout_millis: Optional[int] = None):
194+
# TODO: Fix force flush so the timeout is used https://github.com/open-telemetry/opentelemetry-python/issues/4568.
195+
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
195196
if self._shutdown:
196-
return
197+
return False
197198
# Blocking call to export.
198199
self._export(BatchExportStrategy.EXPORT_ALL)
200+
return True

opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py

Lines changed: 16 additions & 228 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,11 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16-
import collections
1716
import logging
18-
import os
1917
import sys
20-
import threading
2118
import typing
22-
import weakref
2319
from enum import Enum
2420
from os import environ, linesep
25-
from time import time_ns
2621

2722
from opentelemetry.context import (
2823
_SUPPRESS_INSTRUMENTATION_KEY,
@@ -31,14 +26,14 @@
3126
detach,
3227
set_value,
3328
)
29+
from opentelemetry.sdk._shared_internal import BatchProcessor
3430
from opentelemetry.sdk.environment_variables import (
3531
OTEL_BSP_EXPORT_TIMEOUT,
3632
OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
3733
OTEL_BSP_MAX_QUEUE_SIZE,
3834
OTEL_BSP_SCHEDULE_DELAY,
3935
)
4036
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
41-
from opentelemetry.util._once import Once
4237

4338
_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
4439
_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
@@ -125,19 +120,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
125120
return True
126121

127122

128-
class _FlushRequest:
129-
"""Represents a request for the BatchSpanProcessor to flush spans."""
130-
131-
__slots__ = ["event", "num_spans"]
132-
133-
def __init__(self):
134-
self.event = threading.Event()
135-
self.num_spans = 0
136-
137-
138-
_BSP_RESET_ONCE = Once()
139-
140-
141123
class BatchSpanProcessor(SpanProcessor):
142124
"""Batch span processor implementation.
143125
@@ -151,6 +133,8 @@ class BatchSpanProcessor(SpanProcessor):
151133
- :envvar:`OTEL_BSP_MAX_QUEUE_SIZE`
152134
- :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE`
153135
- :envvar:`OTEL_BSP_EXPORT_TIMEOUT`
136+
137+
All the logic for emitting spans, shutting down etc. resides in the `BatchProcessor` class.
154138
"""
155139

156140
def __init__(
@@ -174,6 +158,7 @@ def __init__(
174158
BatchSpanProcessor._default_max_export_batch_size()
175159
)
176160

161+
# Not used. No way currently to pass timeout to export.
177162
if export_timeout_millis is None:
178163
export_timeout_millis = (
179164
BatchSpanProcessor._default_export_timeout_millis()
@@ -183,227 +168,30 @@ def __init__(
183168
max_queue_size, schedule_delay_millis, max_export_batch_size
184169
)
185170

186-
self.span_exporter = span_exporter
187-
self.queue = collections.deque([], max_queue_size) # type: typing.Deque[Span]
188-
self.worker_thread = threading.Thread(
189-
name="OtelBatchSpanProcessor", target=self.worker, daemon=True
171+
self._batch_processor = BatchProcessor(
172+
span_exporter,
173+
schedule_delay_millis,
174+
max_export_batch_size,
175+
export_timeout_millis,
176+
max_queue_size,
177+
"Span",
190178
)
191-
self.condition = threading.Condition(threading.Lock())
192-
self._flush_request = None # type: typing.Optional[_FlushRequest]
193-
self.schedule_delay_millis = schedule_delay_millis
194-
self.max_export_batch_size = max_export_batch_size
195-
self.max_queue_size = max_queue_size
196-
self.export_timeout_millis = export_timeout_millis
197-
self.done = False
198-
# flag that indicates that spans are being dropped
199-
self._spans_dropped = False
200-
# precallocated list to send spans to exporter
201-
self.spans_list = [None] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
202-
self.worker_thread.start()
203-
if hasattr(os, "register_at_fork"):
204-
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
205-
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
206-
self._pid = os.getpid()
207179

208180
def on_start(
209181
self, span: Span, parent_context: Context | None = None
210182
) -> None:
211183
pass
212184

213185
def on_end(self, span: ReadableSpan) -> None:
214-
if self.done:
215-
logger.warning("Already shutdown, dropping span.")
216-
return
217186
if not span.context.trace_flags.sampled:
218187
return
219-
if self._pid != os.getpid():
220-
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)
221-
222-
if len(self.queue) == self.max_queue_size:
223-
if not self._spans_dropped:
224-
logger.warning("Queue is full, likely spans will be dropped.")
225-
self._spans_dropped = True
226-
227-
self.queue.appendleft(span)
228-
229-
if len(self.queue) >= self.max_export_batch_size:
230-
with self.condition:
231-
self.condition.notify()
232-
233-
def _at_fork_reinit(self):
234-
self.condition = threading.Condition(threading.Lock())
235-
self.queue.clear()
236-
237-
# worker_thread is local to a process, only the thread that issued fork continues
238-
# to exist. A new worker thread must be started in child process.
239-
self.worker_thread = threading.Thread(
240-
name="OtelBatchSpanProcessor", target=self.worker, daemon=True
241-
)
242-
self.worker_thread.start()
243-
self._pid = os.getpid()
244-
245-
def worker(self):
246-
timeout = self.schedule_delay_millis / 1e3
247-
flush_request = None # type: typing.Optional[_FlushRequest]
248-
while not self.done:
249-
with self.condition:
250-
if self.done:
251-
# done flag may have changed, avoid waiting
252-
break
253-
flush_request = self._get_and_unset_flush_request()
254-
if (
255-
len(self.queue) < self.max_export_batch_size
256-
and flush_request is None
257-
):
258-
self.condition.wait(timeout)
259-
flush_request = self._get_and_unset_flush_request()
260-
if not self.queue:
261-
# spurious notification, let's wait again, reset timeout
262-
timeout = self.schedule_delay_millis / 1e3
263-
self._notify_flush_request_finished(flush_request)
264-
flush_request = None
265-
continue
266-
if self.done:
267-
# missing spans will be sent when calling flush
268-
break
269-
270-
# subtract the duration of this export call to the next timeout
271-
start = time_ns()
272-
self._export(flush_request)
273-
end = time_ns()
274-
duration = (end - start) / 1e9
275-
timeout = self.schedule_delay_millis / 1e3 - duration
276-
277-
self._notify_flush_request_finished(flush_request)
278-
flush_request = None
279-
280-
# there might have been a new flush request while export was running
281-
# and before the done flag switched to true
282-
with self.condition:
283-
shutdown_flush_request = self._get_and_unset_flush_request()
284-
285-
# be sure that all spans are sent
286-
self._drain_queue()
287-
self._notify_flush_request_finished(flush_request)
288-
self._notify_flush_request_finished(shutdown_flush_request)
289-
290-
def _get_and_unset_flush_request(
291-
self,
292-
) -> typing.Optional[_FlushRequest]:
293-
"""Returns the current flush request and makes it invisible to the
294-
worker thread for subsequent calls.
295-
"""
296-
flush_request = self._flush_request
297-
self._flush_request = None
298-
if flush_request is not None:
299-
flush_request.num_spans = len(self.queue)
300-
return flush_request
301-
302-
@staticmethod
303-
def _notify_flush_request_finished(
304-
flush_request: typing.Optional[_FlushRequest],
305-
):
306-
"""Notifies the flush initiator(s) waiting on the given request/event
307-
that the flush operation was finished.
308-
"""
309-
if flush_request is not None:
310-
flush_request.event.set()
311-
312-
def _get_or_create_flush_request(self) -> _FlushRequest:
313-
"""Either returns the current active flush event or creates a new one.
188+
self._batch_processor.emit(span)
314189

315-
The flush event will be visible and read by the worker thread before an
316-
export operation starts. Callers of a flush operation may wait on the
317-
returned event to be notified when the flush/export operation was
318-
finished.
190+
def shutdown(self):
191+
return self._batch_processor.shutdown()
319192

320-
This method is not thread-safe, i.e. callers need to take care about
321-
synchronization/locking.
322-
"""
323-
if self._flush_request is None:
324-
self._flush_request = _FlushRequest()
325-
return self._flush_request
326-
327-
def _export(self, flush_request: typing.Optional[_FlushRequest]):
328-
"""Exports spans considering the given flush_request.
329-
330-
In case of a given flush_requests spans are exported in batches until
331-
the number of exported spans reached or exceeded the number of spans in
332-
the flush request.
333-
In no flush_request was given at most max_export_batch_size spans are
334-
exported.
335-
"""
336-
if not flush_request:
337-
self._export_batch()
338-
return
339-
340-
num_spans = flush_request.num_spans
341-
while self.queue:
342-
num_exported = self._export_batch()
343-
num_spans -= num_exported
344-
345-
if num_spans <= 0:
346-
break
347-
348-
def _export_batch(self) -> int:
349-
"""Exports at most max_export_batch_size spans and returns the number of
350-
exported spans.
351-
"""
352-
idx = 0
353-
# currently only a single thread acts as consumer, so queue.pop() will
354-
# not raise an exception
355-
while idx < self.max_export_batch_size and self.queue:
356-
self.spans_list[idx] = self.queue.pop()
357-
idx += 1
358-
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
359-
try:
360-
# Ignore type b/c the Optional[None]+slicing is too "clever"
361-
# for mypy
362-
self.span_exporter.export(self.spans_list[:idx]) # type: ignore
363-
except Exception: # pylint: disable=broad-exception-caught
364-
logger.exception("Exception while exporting Span batch.")
365-
detach(token)
366-
367-
# clean up list
368-
for index in range(idx):
369-
self.spans_list[index] = None
370-
return idx
371-
372-
def _drain_queue(self):
373-
"""Export all elements until queue is empty.
374-
375-
Can only be called from the worker thread context because it invokes
376-
`export` that is not thread safe.
377-
"""
378-
while self.queue:
379-
self._export_batch()
380-
381-
def force_flush(self, timeout_millis: int | None = None) -> bool:
382-
if timeout_millis is None:
383-
timeout_millis = self.export_timeout_millis
384-
385-
if self.done:
386-
logger.warning("Already shutdown, ignoring call to force_flush().")
387-
return True
388-
389-
with self.condition:
390-
flush_request = self._get_or_create_flush_request()
391-
# signal the worker thread to flush and wait for it to finish
392-
self.condition.notify_all()
393-
394-
# wait for token to be processed
395-
ret = flush_request.event.wait(timeout_millis / 1e3)
396-
if not ret:
397-
logger.warning("Timeout was exceeded in force_flush().")
398-
return ret
399-
400-
def shutdown(self) -> None:
401-
# signal the worker thread to finish and then wait for it
402-
self.done = True
403-
with self.condition:
404-
self.condition.notify_all()
405-
self.worker_thread.join()
406-
self.span_exporter.shutdown()
193+
def force_flush(self, timeout_millis: typing.Optional[int] = None) -> bool:
194+
return self._batch_processor.force_flush(timeout_millis)
407195

408196
@staticmethod
409197
def _default_max_queue_size():

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,8 @@ def test_simple_log_record_processor_different_msg_types_with_formatter(
332332

333333

334334
class TestBatchLogRecordProcessor(unittest.TestCase):
335+
# Many more test cases for the BatchLogRecordProcessor exist under
336+
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
335337
def test_emit_call_log_record(self):
336338
exporter = InMemoryLogExporter()
337339
log_record_processor = Mock(wraps=BatchLogRecordProcessor(exporter))

0 commit comments

Comments
 (0)