Skip to content

feat(dsm): add manual checkpoint parameter to consume API #13646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions ddtrace/data_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,25 @@
from ddtrace.internal.datastreams.processor import PROPAGATION_KEY_BASE_64


def set_consume_checkpoint(typ, source, carrier_get):
def set_consume_checkpoint(typ, source, carrier_get, manual_checkpoint=True):
"""
:param typ: The type of the checkpoint, usually the streaming technology being used.
Examples include kafka, kinesis, sns etc. (str)
:param source: The source of data. This can be a topic, exchange or stream name. (str)
:param carrier_get: A function used to extract context from the carrier (function (str) -> str)
:param manual_checkpoint: Whether this checkpoint was manually set. Keep true if manually instrumenting.
Manual instrumentation always overrides automatic instrumentation in the case a call is both
manually and automatically instrumented. (bool)

:returns DataStreamsCtx | None
"""
if ddtrace.config._data_streams_enabled:
processor = ddtrace.tracer.data_streams_processor
processor.decode_pathway_b64(carrier_get(PROPAGATION_KEY_BASE_64))
return processor.set_checkpoint(["type:" + typ, "topic:" + source, "direction:in", "manual_checkpoint:true"])
tags = ["type:" + typ, "topic:" + source, "direction:in"]
if manual_checkpoint:
tags.append("manual_checkpoint:true")
return processor.set_checkpoint(tags)


def set_produce_checkpoint(typ, target, carrier_set):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
DSM: Add flag in set_consume_checkpoint() to indicate if DSM checkpoint was manually set.
37 changes: 37 additions & 0 deletions tests/datastreams/test_public_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,40 @@ def test_public_api():
sorted(["direction:in", "manual_checkpoint:true", "type:kinesis", "topic:stream-123"]), parent_hash
)
assert got.hash == expected


def test_manual_checkpoint_behavior():
headers = {}
mocked_tracer = MockedTracer()
with mock.patch("ddtrace.tracer", new=mocked_tracer):
with mock.patch("ddtrace.config", new=MockedConfig()):
with mock.patch.object(mocked_tracer.data_streams_processor, "set_checkpoint") as mock_set_checkpoint:
set_consume_checkpoint("kinesis", "stream-123", headers.get)
called_tags = mock_set_checkpoint.call_args[0][0]
assert "manual_checkpoint:true" in called_tags

mock_set_checkpoint.reset_mock()
set_consume_checkpoint("kinesis", "stream-123", headers.get, manual_checkpoint=False)
called_tags = mock_set_checkpoint.call_args[0][0]
assert "manual_checkpoint:true" not in called_tags


def test_manual_checkpoint_hash_behavior():
headers = {}
mocked_tracer = MockedTracer()
with mock.patch("ddtrace.tracer", new=mocked_tracer):
with mock.patch("ddtrace.config", new=MockedConfig()):
got_with_manual = set_consume_checkpoint("kinesis", "stream-123", headers.get)
got_without_manual = set_consume_checkpoint("kinesis", "stream-123", headers.get, manual_checkpoint=False)

ctx = DataStreamsCtx(mocked_tracer.data_streams_processor, 0, 0, 0)

tags_with_manual = ["direction:in", "manual_checkpoint:true", "type:kinesis", "topic:stream-123"]
expected_with_manual = ctx._compute_hash(sorted(tags_with_manual), 0)

tags_without_manual = ["direction:in", "type:kinesis", "topic:stream-123"]
expected_without_manual = ctx._compute_hash(sorted(tags_without_manual), 0)

assert got_with_manual.hash == expected_with_manual
assert got_without_manual.hash == expected_without_manual
assert got_with_manual.hash != got_without_manual.hash
Loading