Skip to content

Commit 10d7f23

Browse files
authored
feat(weave): support resource attributes for wandb vars
1 parent 665b843 commit 10d7f23

File tree

8 files changed

+172
-126
lines changed

8 files changed

+172
-126
lines changed

tests/trace_server/isolated_client_executor/cross_process_trace_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ def ensure_project_exists(
269269
raise NotImplementedError("ensure_project_exists is not implemented")
270270

271271
# Regular method implementations (reduced duplication)
272-
def otel_export(self, req: tsi.OtelExportReq) -> tsi.OtelExportRes:
272+
def otel_export(self, req: tsi.OTelExportReq) -> tsi.OTelExportRes:
273273
"""Export OTEL traces."""
274274
return self._send_request("otel_export", req)
275275

tests/trace_server/test_opentelemetry.py

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66
from typing import Any
77

88
import pytest
9-
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
10-
ExportTraceServiceRequest,
11-
)
129
from opentelemetry.proto.common.v1.common_pb2 import (
1310
AnyValue,
1411
InstrumentationScope,
@@ -105,8 +102,8 @@ def create_test_span():
105102
return span
106103

107104

108-
def create_test_export_request(project_id="test_project"):
109-
"""Create a test ExportTraceServiceRequest with one span."""
105+
def create_test_export_request(project_id="test_project") -> tsi.OTelExportReq:
106+
"""Create a test OTelExportReq with one span."""
110107
span = create_test_span()
111108

112109
# Create instrumentation scope
@@ -131,15 +128,17 @@ def create_test_export_request(project_id="test_project"):
131128
resource_spans.resource.CopyFrom(resource)
132129
resource_spans.scope_spans.append(scope_spans)
133130

134-
# Create traces data
135-
traces_data = TracesData()
136-
traces_data.resource_spans.append(resource_spans)
137-
138-
# Create export request
139-
request = ExportTraceServiceRequest()
140-
request.resource_spans.append(resource_spans)
131+
# Create processed resource spans
132+
processed_span = tsi.ProcessedResourceSpans(
133+
entity="test-entity",
134+
project="test-project",
135+
run_id=None,
136+
resource_spans=resource_spans,
137+
)
141138

142-
return tsi.OtelExportReq(project_id=project_id, traces=request, wb_user_id=None)
139+
return tsi.OTelExportReq(
140+
project_id=project_id, processed_spans=[processed_span], wb_user_id=None
141+
)
143142

144143

145144
def test_otel_export_clickhouse(client: weave_client.WeaveClient):
@@ -152,7 +151,7 @@ def test_otel_export_clickhouse(client: weave_client.WeaveClient):
152151
# Export the otel traces
153152
response = client.server.otel_export(export_req)
154153
# Verify the response is of the correct type
155-
assert isinstance(response, tsi.OtelExportRes)
154+
assert isinstance(response, tsi.OTelExportRes)
156155

157156
# Query the calls
158157
res = client.server.calls_query(
@@ -164,7 +163,7 @@ def test_otel_export_clickhouse(client: weave_client.WeaveClient):
164163
assert len(res.calls) == 1
165164

166165
call = res.calls[0]
167-
export_span = export_req.traces.resource_spans[0].scope_spans[0].spans[0]
166+
export_span = export_req.processed_spans[0].resource_spans.scope_spans[0].spans[0]
168167
decoded_trace = hexlify(export_span.trace_id).decode("ascii")
169168
decoded_span = hexlify(export_span.span_id).decode("ascii")
170169

@@ -213,8 +212,10 @@ def test_otel_export_with_turn_and_thread(client: weave_client.WeaveClient):
213212
export_req.wb_user_id = "abcd123"
214213

215214
# Add turn and thread attributes to the span
215+
# Materialize processed_spans to avoid iterator exhaustion
216216
test_thread_id = str(uuid.uuid4())
217-
span = export_req.traces.resource_spans[0].scope_spans[0].spans[0]
217+
processed_spans_list = export_req.processed_spans
218+
span = processed_spans_list[0].resource_spans.scope_spans[0].spans[0]
218219

219220
kv_is_turn = KeyValue()
220221
kv_is_turn.key = "wandb.is_turn"
@@ -226,9 +227,11 @@ def test_otel_export_with_turn_and_thread(client: weave_client.WeaveClient):
226227
kv_thread.value.string_value = test_thread_id
227228
span.attributes.append(kv_thread)
228229

230+
export_req.processed_spans = processed_spans_list
231+
229232
# Export the otel traces
230233
response = client.server.otel_export(export_req)
231-
assert isinstance(response, tsi.OtelExportRes)
234+
assert isinstance(response, tsi.OTelExportRes)
232235

233236
# Query the calls
234237
res = client.server.calls_query(
@@ -263,16 +266,20 @@ def test_otel_export_with_turn_no_thread(client: weave_client.WeaveClient):
263266
export_req.wb_user_id = "abcd123"
264267

265268
# Add only is_turn attribute (no thread_id)
266-
span = export_req.traces.resource_spans[0].scope_spans[0].spans[0]
269+
# Materialize processed_spans to avoid iterator exhaustion
270+
processed_spans_list = export_req.processed_spans
271+
span = processed_spans_list[0].resource_spans.scope_spans[0].spans[0]
267272

268273
kv_is_turn = KeyValue()
269274
kv_is_turn.key = "wandb.is_turn"
270275
kv_is_turn.value.bool_value = True
271276
span.attributes.append(kv_is_turn)
272277

278+
export_req.processed_spans = processed_spans_list
279+
273280
# Export the otel traces
274281
response = client.server.otel_export(export_req)
275-
assert isinstance(response, tsi.OtelExportRes)
282+
assert isinstance(response, tsi.OTelExportRes)
276283

277284
# Query the calls
278285
res = client.server.calls_query(
@@ -462,8 +469,9 @@ def test_span_to_call_with_turn_and_thread(self):
462469
def test_traces_data_from_proto(self):
463470
"""Test converting protobuf TracesData to Python TracesData."""
464471
export_req = create_test_export_request()
472+
resource_spans_list = [ps.resource_spans for ps in export_req.processed_spans]
465473
traces_data = PyTracesData.from_proto(
466-
TracesData(resource_spans=export_req.traces.resource_spans)
474+
TracesData(resource_spans=resource_spans_list)
467475
)
468476

469477
assert len(traces_data.resource_spans) == 1
@@ -1160,7 +1168,12 @@ def test_opentelemetry_cost_calculation(self, client: weave_client.WeaveClient):
11601168

11611169
# Create export request
11621170
export_req = create_test_export_request(project_id=project_id)
1163-
export_req.traces.resource_spans[0].scope_spans[0].spans[0].CopyFrom(span_gpt4)
1171+
# Materialize processed_spans to avoid iterator exhaustion
1172+
processed_spans_list = export_req.processed_spans
1173+
processed_spans_list[0].resource_spans.scope_spans[0].spans[0].CopyFrom(
1174+
span_gpt4
1175+
)
1176+
export_req.processed_spans = processed_spans_list
11641177
export_req.wb_user_id = "test_user"
11651178

11661179
# Export the trace
@@ -1444,7 +1457,11 @@ def test_otel_export_partial_success_on_attribute_conflict(
14441457
export_req.wb_user_id = "abcd123"
14451458

14461459
# Good span (already present at index 0)
1447-
good_span = export_req.traces.resource_spans[0].scope_spans[0].spans[0]
1460+
# Materialize processed_spans to avoid iterator exhaustion
1461+
processed_spans_list = export_req.processed_spans
1462+
1463+
# Good span (already present at index 0)
1464+
good_span = processed_spans_list[0].resource_spans.scope_spans[0].spans[0]
14481465
good_span_id = hexlify(good_span.span_id).decode("ascii")
14491466

14501467
# Add a conflicting span to the same batch
@@ -1461,12 +1478,13 @@ def test_otel_export_partial_success_on_attribute_conflict(
14611478
kv_child.value.string_value = "Hello"
14621479
bad_span.attributes.append(kv_child)
14631480

1464-
export_req.traces.resource_spans[0].scope_spans[0].spans.append(bad_span)
1481+
processed_spans_list[0].resource_spans.scope_spans[0].spans.append(bad_span)
1482+
export_req.processed_spans = processed_spans_list
14651483
bad_span_id = hexlify(bad_span.span_id).decode("ascii")
14661484

14671485
# Export
14681486
res = client.server.otel_export(export_req)
1469-
assert isinstance(res, tsi.OtelExportRes)
1487+
assert isinstance(res, tsi.OTelExportRes)
14701488
assert res.partial_success is not None
14711489
assert res.partial_success.rejected_spans == 1
14721490
# Error message should mention the conflicting key and guidance
@@ -1522,7 +1540,9 @@ def test_otel_span_wandb_attributes_and_data_routing(
15221540
export_req.wb_user_id = "abcd123"
15231541

15241542
# Get the span to add custom attributes
1525-
span = export_req.traces.resource_spans[0].scope_spans[0].spans[0]
1543+
# Materialize processed_spans to avoid iterator exhaustion
1544+
processed_spans_list = export_req.processed_spans
1545+
span = processed_spans_list[0].resource_spans.scope_spans[0].spans[0]
15261546

15271547
# Clear default test attributes
15281548
del span.attributes[:]
@@ -1591,9 +1611,11 @@ def test_otel_span_wandb_attributes_and_data_routing(
15911611
kv_provider.value.string_value = "openai"
15921612
span.attributes.append(kv_provider)
15931613

1614+
export_req.processed_spans = processed_spans_list
1615+
15941616
# Export the OTEL traces
15951617
response = client.server.otel_export(export_req)
1596-
assert isinstance(response, tsi.OtelExportRes)
1618+
assert isinstance(response, tsi.OTelExportRes)
15971619

15981620
# Query the calls
15991621
res = client.server.calls_query(

weave/trace_server/clickhouse_trace_server_batched.py

Lines changed: 66 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@
2121
from clickhouse_connect.driver.httputil import get_pool_manager
2222
from clickhouse_connect.driver.query import QueryResult
2323
from clickhouse_connect.driver.summary import QuerySummary
24-
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
25-
ExportTraceServiceRequest,
26-
)
24+
from opentelemetry.proto.trace.v1.trace_pb2 import ResourceSpans
2725
from tenacity import (
2826
retry,
2927
retry_if_exception_type,
@@ -382,20 +380,23 @@ def _create_or_get_placeholder_ops_digest(
382380
return self.file_create(source_file_req).digest
383381

384382
@ddtrace.tracer.wrap(name="clickhouse_trace_server_batched.otel_export")
385-
def otel_export(self, req: tsi.OtelExportReq) -> tsi.OtelExportRes:
383+
def otel_export(self, req: tsi.OTelExportReq) -> tsi.OTelExportRes:
386384
assert_non_null_wb_user_id(req)
387-
388-
if not isinstance(req.traces, ExportTraceServiceRequest):
389-
raise TypeError(
390-
"Expected traces as ExportTraceServiceRequest, got {type(req.traces)}"
391-
)
392385
calls: list[
393386
tuple[tsi.StartedCallSchemaForInsert, tsi.EndedCallSchemaForInsert]
394387
] = []
395388
rejected_spans = 0
396389
error_messages: list[str] = []
390+
for processed_span in req.processed_spans:
391+
# Extract wb_run_id from the processed span
392+
wb_run_id = processed_span.run_id
393+
394+
if not isinstance(processed_span.resource_spans, ResourceSpans):
395+
raise TypeError(
396+
f"Expected resource_spans as ResourceSpans, got {type(processed_span.resource_spans)}"
397+
)
397398

398-
for proto_resource_spans in req.traces.resource_spans:
399+
proto_resource_spans = processed_span.resource_spans
399400
resource = Resource.from_proto(proto_resource_spans.resource)
400401
for proto_scope_spans in proto_resource_spans.scope_spans:
401402
for proto_span in proto_scope_spans.spans:
@@ -423,69 +424,69 @@ def otel_export(self, req: tsi.OtelExportReq) -> tsi.OtelExportRes:
423424
span.to_call(
424425
req.project_id,
425426
wb_user_id=req.wb_user_id,
426-
wb_run_id=req.wb_run_id,
427+
wb_run_id=wb_run_id,
427428
)
428429
)
429430

430-
obj_id_idx_map = defaultdict(list)
431-
for idx, (start_call, _) in enumerate(calls):
432-
op_name = object_creation_utils.make_safe_name(start_call.op_name)
433-
obj_id_idx_map[op_name].append(idx)
431+
obj_id_idx_map = defaultdict(list)
432+
for idx, (start_call, _) in enumerate(calls):
433+
op_name = object_creation_utils.make_safe_name(start_call.op_name)
434+
obj_id_idx_map[op_name].append(idx)
434435

435-
existing_objects = self._get_existing_ops_from_spans(
436-
seen_ids=set(obj_id_idx_map.keys()),
437-
project_id=req.project_id,
438-
limit=len(calls),
439-
)
440-
# We know that OTel will always use the placeholder source.
441-
# We can instead just reuse the existing file if we know it is present
442-
# and create it just once if we are not sure.
443-
if len(existing_objects) == 0:
444-
digest = self._create_or_get_placeholder_ops_digest(
445-
project_id=req.project_id, create=True
446-
)
447-
else:
448-
digest = self._create_or_get_placeholder_ops_digest(
449-
project_id=req.project_id, create=False
436+
existing_objects = self._get_existing_ops_from_spans(
437+
seen_ids=set(obj_id_idx_map.keys()),
438+
project_id=req.project_id,
439+
limit=len(calls),
450440
)
441+
# We know that OTel will always use the placeholder source.
442+
# We can instead just reuse the existing file if we know it is present
443+
# and create it just once if we are not sure.
444+
if len(existing_objects) == 0:
445+
digest = self._create_or_get_placeholder_ops_digest(
446+
project_id=req.project_id, create=True
447+
)
448+
else:
449+
digest = self._create_or_get_placeholder_ops_digest(
450+
project_id=req.project_id, create=False
451+
)
451452

452-
for obj in existing_objects:
453-
op_ref_uri = ri.InternalOpRef(
454-
project_id=req.project_id,
455-
name=obj.object_id,
456-
version=obj.digest,
457-
).uri()
458-
459-
# Modify each of the matched start calls in place
460-
for idx in obj_id_idx_map[obj.object_id]:
461-
calls[idx][0].op_name = op_ref_uri
462-
# Remove this ID from the mapping so that once the for loop is done we are left with only new objects
463-
obj_id_idx_map.pop(obj.object_id)
464-
465-
obj_creation_batch = []
466-
for op_obj_id in obj_id_idx_map.keys():
467-
op_val = object_creation_utils.build_op_val(digest)
468-
obj_creation_batch.append(
469-
tsi.ObjSchemaForInsert(
453+
for obj in existing_objects:
454+
op_ref_uri = ri.InternalOpRef(
470455
project_id=req.project_id,
471-
object_id=op_obj_id,
472-
val=op_val,
473-
wb_user_id=req.wb_user_id,
456+
name=obj.object_id,
457+
version=obj.digest,
458+
).uri()
459+
460+
# Modify each of the matched start calls in place
461+
for idx in obj_id_idx_map[obj.object_id]:
462+
calls[idx][0].op_name = op_ref_uri
463+
# Remove this ID from the mapping so that once the for loop is done we are left with only new objects
464+
obj_id_idx_map.pop(obj.object_id)
465+
466+
obj_creation_batch = []
467+
for op_obj_id in obj_id_idx_map.keys():
468+
op_val = object_creation_utils.build_op_val(digest)
469+
obj_creation_batch.append(
470+
tsi.ObjSchemaForInsert(
471+
project_id=req.project_id,
472+
object_id=op_obj_id,
473+
val=op_val,
474+
wb_user_id=req.wb_user_id,
475+
)
474476
)
475-
)
476-
res = self.obj_create_batch(obj_creation_batch)
477+
res = self.obj_create_batch(obj_creation_batch)
477478

478-
for result in res:
479-
if result.object_id is None:
480-
raise RuntimeError("Otel Export - Expected object_id but got None")
479+
for result in res:
480+
if result.object_id is None:
481+
raise RuntimeError("Otel Export - Expected object_id but got None")
481482

482-
op_ref_uri = ri.InternalOpRef(
483-
project_id=req.project_id,
484-
name=result.object_id,
485-
version=result.digest,
486-
).uri()
487-
for idx in obj_id_idx_map[result.object_id]:
488-
calls[idx][0].op_name = op_ref_uri
483+
op_ref_uri = ri.InternalOpRef(
484+
project_id=req.project_id,
485+
name=result.object_id,
486+
version=result.digest,
487+
).uri()
488+
for idx in obj_id_idx_map[result.object_id]:
489+
calls[idx][0].op_name = op_ref_uri
489490

490491
write_target = self.table_routing_resolver.resolve_v2_write_target(
491492
req.project_id,
@@ -531,13 +532,13 @@ def otel_export(self, req: tsi.OtelExportReq) -> tsi.OtelExportRes:
531532
joined_errors = "; ".join(error_messages[:20]) + (
532533
"; ..." if len(error_messages) > 20 else ""
533534
)
534-
return tsi.OtelExportRes(
535+
return tsi.OTelExportRes(
535536
partial_success=tsi.ExportTracePartialSuccess(
536537
rejected_spans=rejected_spans,
537538
error_message=joined_errors,
538539
)
539540
)
540-
return tsi.OtelExportRes()
541+
return tsi.OTelExportRes()
541542

542543
@ddtrace.tracer.wrap(name="clickhouse_trace_server_batched.kafka_producer.flush")
543544
def _flush_kafka_producer(self) -> None:

0 commit comments

Comments
 (0)