Skip to content

Commit 96aed93

Browse files
committed
fix: improve confluent kafka's global variable access
Signed-off-by: Cagri Yonca <[email protected]>
1 parent 5e5bac7 commit 96aed93

File tree

2 files changed

+25
-18
lines changed

2 files changed

+25
-18
lines changed

src/instana/instrumentation/kafka/confluent_kafka_python.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717
from instana.span.span import InstanaSpan
1818
from instana.util.traceutils import get_tracer_tuple
1919

20-
consumer_token = None
21-
consumer_span = contextvars.ContextVar("confluent_kafka_consumer_span")
20+
consumer_token = contextvars.ContextVar(
21+
"confluent_kafka_consumer_token", default=None
22+
)
23+
consumer_span = contextvars.ContextVar(
24+
"confluent_kafka_consumer_span", default=None
25+
)
2226

2327
# As confluent_kafka is a wrapper around the C-developed librdkafka
2428
# (provided automatically via binary wheels), we have to create new classes
@@ -178,24 +182,23 @@ def create_span(
178182
) # pragma: no cover
179183

180184
def save_consumer_span_into_context(span: "InstanaSpan") -> None:
181-
global consumer_token
182185
ctx = trace.set_span_in_context(span)
183-
consumer_token = context.attach(ctx)
186+
token = context.attach(ctx)
187+
consumer_token.set(token)
184188
consumer_span.set(span)
185189

186190
def close_consumer_span(span: "InstanaSpan") -> None:
187-
global consumer_token
188191
if span.is_recording():
189192
span.end()
190193
consumer_span.set(None)
191-
if consumer_token is not None:
192-
context.detach(consumer_token)
193-
consumer_token = None
194+
token = consumer_token.get(None)
195+
if token is not None:
196+
context.detach(token)
197+
consumer_token.set(None)
194198

195199
def clear_context() -> None:
196-
global consumer_token
197200
context.attach(trace.set_span_in_context(None))
198-
consumer_token = None
201+
consumer_token.set(None)
199202
consumer_span.set(None)
200203

201204
def trace_kafka_consume(
@@ -253,6 +256,10 @@ def trace_kafka_poll(
253256
res = wrapped(*args, **kwargs)
254257
if res:
255258
create_span("poll", res.topic(), res.headers())
259+
else:
260+
span = consumer_span.get(None)
261+
if span is not None:
262+
close_consumer_span(span)
256263
return res
257264
except Exception as exc:
258265
exception = exc

tests/clients/kafka/test_confluent_kafka.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -709,19 +709,19 @@ def test_save_consumer_span_into_context(self, span: "InstanaSpan") -> None:
709709
"""Test save_consumer_span_into_context function."""
710710
# Verify initial state
711711
assert consumer_span.get(None) is None
712-
assert confluent_kafka_python.consumer_token is None
712+
assert confluent_kafka_python.consumer_token.get(None) is None
713713

714714
# Save span into context
715715
save_consumer_span_into_context(span)
716716

717717
# Verify token is stored
718-
assert confluent_kafka_python.consumer_token is not None
718+
assert confluent_kafka_python.consumer_token.get(None) is not None
719719

720720
def test_close_consumer_span_recording_span(self, span: "InstanaSpan") -> None:
721721
"""Test close_consumer_span with a recording span."""
722722
# Save span into context first
723723
save_consumer_span_into_context(span)
724-
assert confluent_kafka_python.consumer_token is not None
724+
assert confluent_kafka_python.consumer_token.get(None) is not None
725725

726726
# Verify span is recording
727727
assert span.is_recording()
@@ -732,7 +732,7 @@ def test_close_consumer_span_recording_span(self, span: "InstanaSpan") -> None:
732732
# Verify span was ended and context cleared
733733
assert not span.is_recording()
734734
assert consumer_span.get(None) is None
735-
assert confluent_kafka_python.consumer_token is None
735+
assert confluent_kafka_python.consumer_token.get(None) is None
736736

737737
def test_clear_context(self, span: "InstanaSpan") -> None:
738738
"""Test clear_context function."""
@@ -741,14 +741,14 @@ def test_clear_context(self, span: "InstanaSpan") -> None:
741741

742742
# Verify context has data
743743
assert consumer_span.get(None) == span
744-
assert confluent_kafka_python.consumer_token is not None
744+
assert confluent_kafka_python.consumer_token.get(None) is not None
745745

746746
# Clear context
747747
clear_context()
748748

749749
# Verify all context is cleared
750750
assert consumer_span.get(None) is None
751-
assert confluent_kafka_python.consumer_token is None
751+
assert confluent_kafka_python.consumer_token.get(None) is None
752752

753753
def test_trace_kafka_close_exception_handling(self, span: "InstanaSpan") -> None:
754754
"""Test trace_kafka_close handles exceptions and still cleans up spans."""
@@ -757,7 +757,7 @@ def test_trace_kafka_close_exception_handling(self, span: "InstanaSpan") -> None
757757

758758
# Verify span is in context
759759
assert consumer_span.get(None) == span
760-
assert confluent_kafka_python.consumer_token is not None
760+
assert confluent_kafka_python.consumer_token.get(None) is not None
761761

762762
# Mock a wrapped function that raises an exception
763763
mock_wrapped = Mock(side_effect=Exception("Close operation failed"))
@@ -772,7 +772,7 @@ def test_trace_kafka_close_exception_handling(self, span: "InstanaSpan") -> None
772772

773773
# Verify that despite the exception, the span was cleaned up
774774
assert consumer_span.get(None) is None
775-
assert confluent_kafka_python.consumer_token is None
775+
assert confluent_kafka_python.consumer_token.get(None) is None
776776

777777
# Verify span was ended
778778
assert not span.is_recording()

0 commit comments

Comments
 (0)