Skip to content

Commit 12bcd45

Browse files
DylanRussellxrmx
andauthored
Fix flaky test that tests lots of threads calling emit. Make sure tests shutdown the batch exporter. (#4600)
* Initital commit to imporve shutdown behavior. * Remove print statements and changes from shutdown branch * Remove unused imports/deps * fix lint issue --------- Co-authored-by: Riccardo Magliocchetti <[email protected]>
1 parent 8675ab7 commit 12bcd45

File tree

4 files changed

+54
-35
lines changed

4 files changed

+54
-35
lines changed

opentelemetry-sdk/test-requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@ zipp==3.19.2
1414
-e tests/opentelemetry-test-utils
1515
-e opentelemetry-api
1616
-e opentelemetry-semantic-conventions
17-
-e opentelemetry-sdk
17+
-e opentelemetry-sdk

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import os
1818
import time
1919
import unittest
20+
from concurrent.futures import ThreadPoolExecutor
2021
from sys import version_info
2122
from unittest.mock import Mock, patch
2223

@@ -331,9 +332,12 @@ def test_simple_log_record_processor_different_msg_types_with_formatter(
331332
self.assertEqual(expected, emitted)
332333

333334

335+
# Many more test cases for the BatchLogRecordProcessor exist under
336+
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
337+
# Important: make sure to call .shutdown() on the BatchLogRecordProcessor
338+
# before the end of the test, otherwise the worker thread will continue
339+
# to run after the end of the test.
334340
class TestBatchLogRecordProcessor(unittest.TestCase):
335-
# Many more test cases for the BatchLogRecordProcessor exist under
336-
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
337341
def test_emit_call_log_record(self):
338342
exporter = InMemoryLogExporter()
339343
log_record_processor = Mock(wraps=BatchLogRecordProcessor(exporter))
@@ -346,6 +350,34 @@ def test_emit_call_log_record(self):
346350

347351
logger.error("error")
348352
self.assertEqual(log_record_processor.emit.call_count, 1)
353+
log_record_processor.shutdown()
354+
355+
def test_with_multiple_threads(self): # pylint: disable=no-self-use
356+
exporter = InMemoryLogExporter()
357+
batch_processor = BatchLogRecordProcessor(
358+
exporter,
359+
max_queue_size=3000,
360+
max_export_batch_size=50,
361+
schedule_delay_millis=30000,
362+
export_timeout_millis=500,
363+
)
364+
365+
def bulk_emit(num_emit):
366+
for _ in range(num_emit):
367+
batch_processor.emit(EMPTY_LOG)
368+
369+
total_expected_logs = 0
370+
with ThreadPoolExecutor(max_workers=69) as executor:
371+
for num_logs_to_emit in range(1, 70):
372+
executor.submit(bulk_emit, num_logs_to_emit)
373+
total_expected_logs += num_logs_to_emit
374+
375+
executor.shutdown()
376+
377+
batch_processor.shutdown()
378+
# Wait a bit for logs to flush.
379+
time.sleep(2)
380+
assert len(exporter.get_finished_logs()) == total_expected_logs
349381

350382
@mark.skipif(
351383
version_info < (3, 10),
@@ -404,6 +436,7 @@ def test_args(self):
404436
self.assertEqual(
405437
log_record_processor._batch_processor._export_timeout_millis, 15000
406438
)
439+
log_record_processor.shutdown()
407440

408441
@patch.dict(
409442
"os.environ",
@@ -432,6 +465,7 @@ def test_env_vars(self):
432465
self.assertEqual(
433466
log_record_processor._batch_processor._export_timeout_millis, 15000
434467
)
468+
log_record_processor.shutdown()
435469

436470
def test_args_defaults(self):
437471
exporter = InMemoryLogExporter()
@@ -451,6 +485,7 @@ def test_args_defaults(self):
451485
self.assertEqual(
452486
log_record_processor._batch_processor._export_timeout_millis, 30000
453487
)
488+
log_record_processor.shutdown()
454489

455490
@patch.dict(
456491
"os.environ",
@@ -481,6 +516,7 @@ def test_args_env_var_value_error(self):
481516
self.assertEqual(
482517
log_record_processor._batch_processor._export_timeout_millis, 30000
483518
)
519+
log_record_processor.shutdown()
484520

485521
def test_args_none_defaults(self):
486522
exporter = InMemoryLogExporter()
@@ -506,6 +542,7 @@ def test_args_none_defaults(self):
506542
self.assertEqual(
507543
log_record_processor._batch_processor._export_timeout_millis, 30000
508544
)
545+
log_record_processor.shutdown()
509546

510547
def test_validation_negative_max_queue_size(self):
511548
exporter = InMemoryLogExporter()

opentelemetry-sdk/tests/shared_internal/test_batch_processor.py

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,10 @@
1919
import time
2020
import unittest
2121
import weakref
22-
from concurrent.futures import ThreadPoolExecutor
2322
from platform import system
24-
from sys import version_info
2523
from unittest.mock import Mock
2624

2725
import pytest
28-
from pytest import mark
2926

3027
from opentelemetry.sdk._logs import (
3128
LogData,
@@ -53,6 +50,8 @@
5350

5451

5552
# BatchLogRecodProcessor/BatchSpanProcessor initialize and use BatchProcessor.
53+
# Important: make sure to call .shutdown() before the end of the test,
54+
# otherwise the worker thread will continue to run after the end of the test.
5655
@pytest.mark.parametrize(
5756
"batch_processor_class,telemetry",
5857
[(BatchLogRecordProcessor, EMPTY_LOG), (BatchSpanProcessor, BASIC_SPAN)],
@@ -80,6 +79,7 @@ def test_telemetry_exported_once_batch_size_reached(
8079
after_export = time.time_ns()
8180
# Shows the worker's 30 second sleep was interrupted within a second.
8281
assert after_export - before_export < 1e9
82+
batch_processor.shutdown()
8383

8484
# pylint: disable=no-self-use
8585
def test_telemetry_exported_once_schedule_delay_reached(
@@ -96,6 +96,7 @@ def test_telemetry_exported_once_schedule_delay_reached(
9696
batch_processor._batch_processor.emit(telemetry)
9797
time.sleep(0.2)
9898
exporter.export.assert_called_once_with([telemetry])
99+
batch_processor.shutdown()
99100

100101
def test_telemetry_flushed_before_shutdown_and_dropped_after_shutdown(
101102
self, batch_processor_class, telemetry
@@ -136,33 +137,7 @@ def test_force_flush_flushes_telemetry(
136137
batch_processor._batch_processor.emit(telemetry)
137138
batch_processor.force_flush()
138139
exporter.export.assert_called_once_with([telemetry for _ in range(10)])
139-
140-
@mark.skipif(
141-
system() == "Windows" or version_info < (3, 9),
142-
reason="This test randomly fails on windows and python 3.8.",
143-
)
144-
def test_with_multiple_threads(self, batch_processor_class, telemetry):
145-
exporter = Mock()
146-
batch_processor = batch_processor_class(
147-
exporter,
148-
max_queue_size=3000,
149-
max_export_batch_size=1000,
150-
schedule_delay_millis=30000,
151-
export_timeout_millis=500,
152-
)
153-
154-
def bulk_emit_and_flush(num_emit):
155-
for _ in range(num_emit):
156-
batch_processor._batch_processor.emit(telemetry)
157-
batch_processor.force_flush()
158-
159-
with ThreadPoolExecutor(max_workers=69) as executor:
160-
for idx in range(69):
161-
executor.submit(bulk_emit_and_flush, idx + 1)
162-
163-
executor.shutdown()
164-
# 69 calls to force flush.
165-
assert exporter.export.call_count == 69
140+
batch_processor.shutdown()
166141

167142
@unittest.skipUnless(
168143
hasattr(os, "fork"),
@@ -202,6 +177,7 @@ def child(conn):
202177
batch_processor.force_flush()
203178
# Single export for the telemetry we emitted at the start of the test.
204179
assert exporter.export.call_count == 1
180+
batch_processor.shutdown()
205181

206182
def test_record_processor_is_garbage_collected(
207183
self, batch_processor_class, telemetry

opentelemetry-sdk/tests/trace/export/test_export.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,12 @@ def test_simple_span_processor_not_sampled(self):
144144
self.assertListEqual([], spans_names_list)
145145

146146

147+
# Many more test cases for the BatchSpanProcessor exist under
148+
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
149+
# Important: make sure to call .shutdown() on the BatchSpanProcessor
150+
# before the end of the test, otherwise the worker thread will continue
151+
# to run after the end of the test.
147152
class TestBatchSpanProcessor(unittest.TestCase):
148-
# Many more test cases for the BatchSpanProcessor exist under
149-
# opentelemetry-sdk/tests/shared_internal/test_batch_processor.py.
150153
@mock.patch.dict(
151154
"os.environ",
152155
{
@@ -173,6 +176,7 @@ def test_args_env_var(self):
173176
self.assertEqual(
174177
batch_span_processor._batch_processor._export_timeout_millis, 4
175178
)
179+
batch_span_processor.shutdown()
176180

177181
def test_args_env_var_defaults(self):
178182
batch_span_processor = export.BatchSpanProcessor(
@@ -191,6 +195,7 @@ def test_args_env_var_defaults(self):
191195
self.assertEqual(
192196
batch_span_processor._batch_processor._export_timeout_millis, 30000
193197
)
198+
batch_span_processor.shutdown()
194199

195200
@mock.patch.dict(
196201
"os.environ",
@@ -220,6 +225,7 @@ def test_args_env_var_value_error(self):
220225
self.assertEqual(
221226
batch_span_processor._batch_processor._export_timeout_millis, 30000
222227
)
228+
batch_span_processor.shutdown()
223229

224230
def test_on_start_accepts_parent_context(self):
225231
# pylint: disable=no-self-use

0 commit comments

Comments
 (0)