Skip to content

fix: log and trace processor memory leak #4449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 9, 2025
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4458](https://github.com/open-telemetry/opentelemetry-python/pull/4458))
- pylint-ci updated python version to 3.13
([#4450](https://github.com/open-telemetry/opentelemetry-python/pull/4450))
- Fix memory leak in Log & Trace exporter
([#4449](https://github.com/open-telemetry/opentelemetry-python/pull/4449))

## Version 1.30.0/0.51b0 (2025-02-03)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os
import sys
import threading
import weakref
from os import environ, linesep
from time import time_ns
from typing import IO, Callable, Deque, List, Optional, Sequence
Expand Down Expand Up @@ -216,7 +217,8 @@ def __init__(
self._log_records = [None] * self._max_export_batch_size
self._worker_thread.start()
if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
self._pid = os.getpid()

def _at_fork_reinit(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ def __init__(
weak_at_fork = weakref.WeakMethod(self._at_fork_reinit)

os.register_at_fork(
after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda, protected-access
after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda
)
elif self._export_interval_millis <= 0:
raise ValueError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import sys
import threading
import typing
import weakref
from enum import Enum
from os import environ, linesep
from time import time_ns
Expand Down Expand Up @@ -200,7 +201,8 @@ def __init__(
self.spans_list = [None] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
self.worker_thread.start()
if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
self._pid = os.getpid()

def on_start(
Expand Down
19 changes: 19 additions & 0 deletions opentelemetry-sdk/tests/logs/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
# limitations under the License.

# pylint: disable=protected-access
import gc
import logging
import multiprocessing
import os
import time
import unittest
import weakref
from concurrent.futures import ThreadPoolExecutor
from unittest.mock import Mock, patch

Expand Down Expand Up @@ -619,6 +621,23 @@ def _target():

log_record_processor.shutdown()

def test_batch_log_record_processor_gc(self):
# Given a BatchLogRecordProcessor
exporter = InMemoryLogExporter()
processor = BatchLogRecordProcessor(exporter)
weak_ref = weakref.ref(processor)
processor.shutdown()

# When the processor is garbage collected
del processor
gc.collect()

# Then the reference to the processor should no longer exist
self.assertIsNone(
weak_ref(),
"The BatchLogRecordProcessor object created by this test wasn't garbage collected",
)


class TestConsoleLogExporter(unittest.TestCase):
def test_export(self): # pylint: disable=no-self-use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

# pylint: disable=protected-access,invalid-name,no-self-use

import gc
import math
import weakref
from logging import WARNING
from time import sleep, time_ns
from typing import Optional, Sequence
Expand Down Expand Up @@ -257,3 +259,24 @@ def test_metric_timeout_does_not_kill_worker_thread(self):
sleep(0.1)
self.assertTrue(pmr._daemon_thread.is_alive())
pmr.shutdown()

def test_metric_exporer_gc(self):
# Given a PeriodicExportingMetricReader
exporter = FakeMetricsExporter(
preferred_aggregation={
Counter: LastValueAggregation(),
},
)
processor = PeriodicExportingMetricReader(exporter)
weak_ref = weakref.ref(processor)
processor.shutdown()

# When we garbage collect the reader
del processor
gc.collect()

# Then the reference to the reader should no longer exist
self.assertIsNone(
weak_ref(),
"The PeriodicExportingMetricReader object created by this test wasn't garbage collected",
)
19 changes: 19 additions & 0 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import gc
import multiprocessing
import os
import threading
import time
import unittest
import weakref
from concurrent.futures import ThreadPoolExecutor
from logging import WARNING
from platform import python_implementation, system
Expand Down Expand Up @@ -585,6 +587,23 @@ def test_batch_span_processor_parameters(self):
max_export_batch_size=512,
)

def test_batch_span_processor_gc(self):
# Given a BatchSpanProcessor
exporter = MySpanExporter(destination=[])
processor = export.BatchSpanProcessor(exporter)
weak_ref = weakref.ref(processor)
processor.shutdown()

# When the processor is garbage collected
del processor
gc.collect()

# Then the reference to the processor should no longer exist
self.assertIsNone(
weak_ref(),
"The BatchSpanProcessor object created by this test wasn't garbage collected",
)


class TestConsoleSpanExporter(unittest.TestCase):
def test_export(self): # pylint: disable=no-self-use
Expand Down