Skip to content

Commit 423a21c

Browse files
inline with dotnet implementation
1 parent a12e69f commit 423a21c

File tree

3 files changed

+62
-31
lines changed

3 files changed

+62
-31
lines changed

example.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232
tracer = trace.get_tracer(__name__)
3333

3434
# Start a span (logs heartbeat and stop events)
35-
with tracer.start_as_current_span("partial_span_1"):
36-
print("partial_span_1 is running")
37-
sleep(8)
35+
with tracer.start_as_current_span("span 1"):
36+
with tracer.start_as_current_span("span 2"):
37+
print("sleeping inside span 2")
38+
sleep(2)
39+
print("sleeping inside span 1")
40+
sleep(5)
41+
print("sleeping outside spans")
42+
sleep(5)

src/partial_span_processor/__init__.py

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
import json
1919
import threading
2020
import time
21-
from queue import Queue
22-
from typing import TYPE_CHECKING, Deque
21+
from typing import TYPE_CHECKING
2322

2423
from google.protobuf import json_format
2524
from opentelemetry._logs.severity import SeverityNumber
@@ -29,6 +28,8 @@
2928
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
3029
from opentelemetry.trace import TraceFlags
3130

31+
from partial_span_processor.peekable_queue import PeekableQueue
32+
3233
if TYPE_CHECKING:
3334
from opentelemetry import context as context_api
3435
from opentelemetry.sdk._logs.export import LogExporter
@@ -79,10 +80,11 @@ def __init__(
7980
self.resource = resource
8081

8182
self.active_spans = {}
82-
self.delayed_heartbeat_spans: Deque[tuple[int, datetime.datetime]] = Deque[
83-
tuple[int, datetime.datetime]]()
83+
self.delayed_heartbeat_spans: PeekableQueue[tuple[int, datetime.datetime]] = \
84+
PeekableQueue()
8485
self.delayed_heartbeat_spans_lookup: set[int] = set()
85-
self.ready_heartbeat_spans: Queue[tuple[int, datetime.datetime]] = Queue()
86+
self.ready_heartbeat_spans: PeekableQueue[
87+
tuple[int, datetime.datetime]] = PeekableQueue()
8688
self.lock = threading.Lock()
8789

8890
self.done = False
@@ -110,18 +112,20 @@ def on_start(self, span: Span,
110112

111113
next_heartbeat_time = datetime.datetime.now() + datetime.timedelta(
112114
milliseconds=self.initial_heartbeat_delay_millis)
113-
self.delayed_heartbeat_spans.appendleft(
115+
self.delayed_heartbeat_spans.put(
114116
(span.context.span_id, next_heartbeat_time))
115117

116118
def on_end(self, span: ReadableSpan) -> None:
119+
is_delayed_heartbeat_pending = False
117120
with self.lock:
118121
self.active_spans.pop(span.context.span_id)
119122

120123
if span.context.span_id in self.delayed_heartbeat_spans_lookup:
124+
is_delayed_heartbeat_pending = True
121125
self.delayed_heartbeat_spans_lookup.remove(span.context.span_id)
122-
self.delayed_heartbeat_spans.remove(
123-
(span.context.span_id, next(iter(self.delayed_heartbeat_spans))[1]))
124-
return
126+
127+
if is_delayed_heartbeat_pending:
128+
return
125129

126130
self.export_log(span, get_stop_attributes())
127131

@@ -188,28 +192,42 @@ def get_log_data(self, span: Span, attributes: dict[str, str]) -> LogData:
188192
)
189193

190194
def process_delayed_heartbeat_spans(self) -> None:
191-
with self.lock:
195+
spans_to_be_logged = []
196+
with (self.lock):
192197
now = datetime.datetime.now()
193-
while self.delayed_heartbeat_spans and self.delayed_heartbeat_spans[-1][
194-
1] <= now:
195-
span_id, _ = self.delayed_heartbeat_spans.pop()
198+
while True:
199+
if self.delayed_heartbeat_spans.empty():
200+
break
201+
202+
(span_id, next_heartbeat_time) = self.delayed_heartbeat_spans.peek()
203+
if next_heartbeat_time > now:
204+
break
205+
196206
self.delayed_heartbeat_spans_lookup.discard(span_id)
207+
self.delayed_heartbeat_spans.get()
197208

198209
span = self.active_spans.get(span_id)
199210
if not span:
200211
continue
201212

202-
self.export_log(span, self.get_heartbeat_attributes())
213+
spans_to_be_logged.append(span)
203214

204215
next_heartbeat_time = now + datetime.timedelta(
205216
milliseconds=self.heartbeat_interval_millis)
206217
self.ready_heartbeat_spans.put((span_id, next_heartbeat_time))
207218

219+
for span in spans_to_be_logged:
220+
self.export_log(span, self.get_heartbeat_attributes())
221+
208222
def process_ready_heartbeat_spans(self) -> None:
223+
spans_to_be_logged = []
209224
now = datetime.datetime.now()
210225
with self.lock:
211-
while not self.ready_heartbeat_spans.empty():
212-
span_id, next_heartbeat_time = self.ready_heartbeat_spans.queue[0]
226+
while True:
227+
if self.ready_heartbeat_spans.empty():
228+
break
229+
230+
(span_id, next_heartbeat_time) = self.ready_heartbeat_spans.peek()
213231
if next_heartbeat_time > now:
214232
break
215233

@@ -219,12 +237,15 @@ def process_ready_heartbeat_spans(self) -> None:
219237
if not span:
220238
continue
221239

222-
self.export_log(span, self.get_heartbeat_attributes())
240+
spans_to_be_logged.append(span)
223241

224242
next_heartbeat_time = now + datetime.timedelta(
225243
milliseconds=self.heartbeat_interval_millis)
226244
self.ready_heartbeat_spans.put((span_id, next_heartbeat_time))
227245

246+
for span in spans_to_be_logged:
247+
self.export_log(span, self.get_heartbeat_attributes())
248+
228249

229250
def get_stop_attributes() -> dict[str, str]:
230251
return {

tests/partial_span_processor/test_partial_span_processor.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,30 +105,35 @@ def test_invalid_process_interval(self):
105105

106106
def test_on_start(self):
107107
span = TestPartialSpanProcessor.create_mock_span()
108-
span_id = span.get_span_context().span_id
108+
expected_span_id = span.get_span_context().span_id
109+
now = datetime.datetime.now()
109110
self.processor.on_start(span)
110111

111-
self.assertIn(span_id, self.processor.active_spans)
112-
self.assertIn(span_id, self.processor.delayed_heartbeat_spans_lookup)
113-
self.assertIn((span_id, unittest.mock.ANY),
114-
self.processor.delayed_heartbeat_spans)
112+
self.assertIn(expected_span_id, self.processor.active_spans)
113+
self.assertIn(expected_span_id,
114+
self.processor.delayed_heartbeat_spans_lookup)
115+
self.assertEqual(self.processor.delayed_heartbeat_spans.qsize(), 1)
116+
(
117+
span_id,
118+
next_heartbeat_time) = self.processor.delayed_heartbeat_spans.get()
119+
self.assertEqual(expected_span_id, span_id)
120+
self.assertGreater(next_heartbeat_time, now)
121+
self.assertEqual(self.log_exporter.get_finished_logs(), ())
115122

116123
def test_on_end_when_initial_heartbeat_not_sent(self):
117124
span = TestPartialSpanProcessor.create_mock_span()
118125
span_id = span.get_span_context().span_id
119126

120127
self.processor.active_spans[span_id] = span
121128
self.processor.delayed_heartbeat_spans_lookup.add(span_id)
122-
self.processor.delayed_heartbeat_spans.append(
123-
(span_id, unittest.mock.ANY))
129+
self.processor.delayed_heartbeat_spans.put((span_id, unittest.mock.ANY))
124130

125131
self.processor.on_end(span)
126132

127133
self.assertNotIn(span_id, self.processor.active_spans)
128134
self.assertNotIn(span_id,
129135
self.processor.delayed_heartbeat_spans_lookup)
130-
self.assertNotIn((span_id, unittest.mock.ANY),
131-
self.processor.delayed_heartbeat_spans)
136+
self.assertFalse(self.processor.delayed_heartbeat_spans.empty())
132137
self.assertEqual(self.log_exporter.get_finished_logs(), ())
133138

134139
def test_on_end_when_initial_heartbeat_sent(self):
@@ -155,15 +160,15 @@ def test_process_delayed_heartbeat_spans(self):
155160

156161
self.processor.active_spans[span_id] = span
157162
now = datetime.datetime.now()
158-
self.processor.delayed_heartbeat_spans.append((span_id, now))
163+
self.processor.delayed_heartbeat_spans.put((span_id, now))
159164
self.processor.delayed_heartbeat_spans_lookup.add(span_id)
160165

161166
with patch("datetime.datetime") as mock_datetime:
162167
mock_datetime.now.return_value = now
163168
self.processor.process_delayed_heartbeat_spans()
164169

165170
self.assertNotIn(span_id, self.processor.delayed_heartbeat_spans_lookup)
166-
self.assertNotIn((span_id, now), self.processor.delayed_heartbeat_spans)
171+
self.assertTrue(self.processor.delayed_heartbeat_spans.empty())
167172

168173
next_heartbeat_time = now + datetime.timedelta(
169174
milliseconds=self.processor.heartbeat_interval_millis)

0 commit comments

Comments
 (0)