Skip to content

Commit ff7f60e

Browse files
vasanttejaaabmass
andauthored
feat: Add sync streaming support for Anthropic instrumentation (#4155)
* Add sync streaming support for Anthropic instrumentation - Add support for Messages.create(stream=True) with StreamWrapper - Add support for Messages.stream() with MessageStreamManagerWrapper - Add MessageWrapper for non-streaming response telemetry - Rename MessageCreateParams to MessageRequestParams - Add comprehensive tests for sync streaming functionality * Add changelog entry for sync streaming support * Fix type checking errors with type: ignore comments - Add type: ignore[arg-type] for Union type narrowing in messages_create - Add type: ignore[return-value] for wrapper return types - Add type: ignore[return-value] for __exit__ returning None * Refactor Anthropic instrumentation to improve usage tracking and error handling - Introduce constants for provider name and cache token attributes. - Normalize stop reasons and aggregate cache token fields in MessageWrapper and StreamWrapper. - Enhance tests to validate input token aggregation and stop reason normalization. - Update cassettes for new request and response structures in streaming scenarios. * Refactor utility functions and test cases for improved readability and consistency - Simplify constant definitions and normalize function calls in utils.py. - Enhance test cases by removing unnecessary line breaks and improving formatting. - Ensure consistent usage of type hints and comments in test functions. * Refactor argument handling in assert_span_attributes function - Update the pylint directive to disable too-many-arguments warning for better clarity. - Maintain consistency in function signature and improve code readability. * Enhance tests for streaming message handling in Anthropic instrumentation - Update test cases to validate streaming behavior with various parameters, including token usage and stop reasons. - Introduce new cassettes for different scenarios, ensuring comprehensive coverage of streaming interactions. - Refactor existing tests for clarity and consistency in structure and assertions. * Update test_sync_messages.py to disable pylint warning for too-many-locals in test_stream_wrapper_finalize_idempotent function * Enhance StreamWrapper and MessageStreamManagerWrapper for idempotent finalization - Refactor finalization logic in StreamWrapper and MessageStreamManagerWrapper to ensure idempotent behavior during context exit. - Introduce new methods for successful and error finalization, improving clarity and reducing code duplication. - Add tests to validate double exit idempotency in streaming scenarios, ensuring only one span is emitted. - Update cassettes to reflect new request and response structures for streaming interactions. * Enhance Anthropic instrumentation to support content capture - Added logger_provider to TelemetryHandler for improved logging capabilities. - Implemented content capture logic in messages_create and messages_stream functions, allowing for the extraction of input messages and system instructions. - Introduced utility functions for content conversion and message handling in utils.py. - Updated tests to validate content capture functionality for both synchronous and streaming message creation. - Added new cassettes to reflect the changes in request and response structures for content capture scenarios. * Enhance tests for sync message creation in Anthropic instrumentation - Added checks for the presence of 'tools' and 'thinking' parameters in the installed anthropic SDK. - Updated test cases to skip if the SDK version does not support these parameters, ensuring compatibility with older versions. - Improved test robustness by dynamically determining parameter support. * Remove sensitive 'anthropic-organization-id' headers from test cassettes and update header scrubbing logic in tests. This enhances security by ensuring sensitive information is not recorded in test artifacts. * Refactor tests for sync message handling in Anthropic instrumentation - Simplified detection of 'tools' and 'thinking' parameters by directly accessing the _Messages class. - Improved readability of test cases by formatting input message loading. - Enhanced test function signatures for better clarity and maintainability. * Refactor utils.py for improved type safety and clarity - Added type casting for dictionary access to enhance type safety. - Simplified content block conversion logic to improve readability and maintainability. - Updated test cases to ensure consistent handling of content types and structures. * Enhance Anthropic instrumentation tests for EVENT_ONLY content capture - Introduced a new fixture to instrument Anthropic with EVENT_ONLY content capture mode. - Added tests to verify that content is not captured in span attributes while ensuring log events are emitted correctly. - Updated cassettes to reflect new request and response structures for EVENT_ONLY scenarios. - Enhanced existing tests to cover various content capture scenarios, including streaming and tool usage. * Refactor assertion in sync messages test for clarity - Simplified the assertion statement in the test_sync_messages_create_event_only_no_content_in_span function to improve readability. * Refactor content capture logic and enhance streaming tests for Anthropic instrumentation. * unsetting the model. * Remove instrumentation for Messages.stream() and refactor related code. Introduced MessageWrapper and StreamWrapper classes for telemetry handling. Updated tests to reflect changes in instrumentation behavior. * Refactor Anthropic instrumentation: reorganize imports, enhance utility functions, and update wrapper classes for better clarity and maintainability. Removed unused code and improved type safety in utility functions. Updated tests to reflect changes in the instrumentation behavior. * Add message extractors for Anthropic instrumentation. * Refactor message extractors in Anthropic instrumentation: reorganize imports and streamline finish reason normalization for improved clarity and maintainability. * Update test cassettes for Anthropic instrumentation: streamline request and response structures, enhance error handling scenarios, and ensure consistency in message formats across various test cases. Removed outdated data and improved clarity in test interactions. * Enhance Anthropic instrumentation: update MessageWrapper and StreamWrapper to include content capture logic, improve type safety with explicit casting, and streamline test cases for better clarity. Added new test for streaming response attributes and refined existing tests to ensure consistency in message handling. * Update test cassettes for Anthropic instrumentation: modify message IDs, timestamps, and token usage across various test cases. Refine content capture logic and ensure consistency in message formats, including adjustments to event data and headers for improved clarity and accuracy. * Rename StreamWrapper to MessagesStreamWrapper and update references in code and tests * Refactor type annotations in message extractors and wrappers for improved type safety. Replace 'Any' with 'object' in several function signatures and class attributes. Introduce logging for error handling in MessagesStreamWrapper to enhance instrumentation reliability. * Enhance type annotations in message extractors and patch for improved clarity and safety. Update function signatures to use specific types instead of 'object', including changes to parameters in extract_params, get_input_messages, and get_system_instruction. Refactor messages_create to ensure correct type handling for streaming and non-streaming responses. Additionally, streamline message handling in MessagesStreamWrapper for better performance and reliability. * Enhance type safety and error handling in message processing. Update function signatures in `messages_extractors.py` and `wrappers.py` to include specific types, improving clarity and reliability. Introduce handling for `None` values in `get_input_messages` and `get_system_instruction`. Refactor `MessagesStreamWrapper` to better manage usage updates and ensure correct type handling for streaming responses. Add new test cases for aggregating cache tokens and handling streaming errors. * Refactor assertions in test_sync_messages.py for improved readability. Simplify assertion statements by removing unnecessary parentheses, enhancing code clarity in cache token tests. * enforce strong typing system. * Update anthropic dependency version to 0.51.0 in pyproject.toml and requirements.oldest.txt for compatibility improvements. * Refactor usage token extraction to utilize a new UsageTokens dataclass for improved clarity and type safety. Update extract_usage_tokens function to return UsageTokens instead of a tuple, and adjust related invocations in MessageWrapper and MessagesStreamWrapper accordingly. * Update anthropic dependency version in uv.lock to 0.51.0 for compatibility improvements. * Add tests for should_capture_content function in test_events_options.py. * Enhance Anthropic instrumentation by adding logging support and refining type hints in messages_create function. Update test cassettes for improved accuracy and consistency in response data. * Refactor content capturing utility function to clarify its purpose in experimental mode. Update related tests to reflect the new function name and ensure accurate assertions for content capturing behavior. * Refactor import statements in patch.py for improved readability and organization. --------- Co-authored-by: Aaron Abbott <[email protected]>
1 parent 578373a commit ff7f60e

32 files changed

+3684
-2514
lines changed

instrumentation-genai/opentelemetry-instrumentation-anthropic/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- Add sync streaming support for `Messages.create(stream=True)` and `Messages.stream()`
13+
([#4155](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4155))
14+
- `StreamWrapper` for handling `Messages.create(stream=True)` telemetry
15+
- `MessageStreamManagerWrapper` for handling `Messages.stream()` telemetry
16+
- `MessageWrapper` for non-streaming response telemetry extraction
1217
- Initial implementation of Anthropic instrumentation
1318
([#3978](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3978))
1419
- Implement sync `Messages.create` instrumentation with GenAI semantic convention attributes

instrumentation-genai/opentelemetry-instrumentation-anthropic/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ dependencies = [
3434

3535
[project.optional-dependencies]
3636
instruments = [
37-
"anthropic >= 0.16.0",
37+
"anthropic >= 0.51.0",
3838
]
3939

4040
[project.entry-points.opentelemetry_instrumentor]

instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@
5454
)
5555

5656
from opentelemetry.instrumentation.anthropic.package import _instruments
57-
from opentelemetry.instrumentation.anthropic.patch import messages_create
57+
from opentelemetry.instrumentation.anthropic.patch import (
58+
messages_create,
59+
)
5860
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
5961
from opentelemetry.instrumentation.utils import unwrap
6062
from opentelemetry.util.genai.handler import TelemetryHandler
@@ -89,11 +91,12 @@ def _instrument(self, **kwargs: Any) -> None:
8991
# Get providers from kwargs
9092
tracer_provider = kwargs.get("tracer_provider")
9193
meter_provider = kwargs.get("meter_provider")
94+
logger_provider = kwargs.get("logger_provider")
9295

93-
# TODO: Add logger_provider to TelemetryHandler to capture content events.
9496
handler = TelemetryHandler(
9597
tracer_provider=tracer_provider,
9698
meter_provider=meter_provider,
99+
logger_provider=logger_provider,
97100
)
98101

99102
# Patch Messages.create
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Get/extract helpers for Anthropic Messages instrumentation."""
16+
17+
from __future__ import annotations
18+
19+
from dataclasses import dataclass
20+
from typing import TYPE_CHECKING, Sequence
21+
22+
from anthropic.types import MessageDeltaUsage
23+
24+
from opentelemetry.semconv._incubating.attributes import (
25+
gen_ai_attributes as GenAIAttributes,
26+
)
27+
from opentelemetry.semconv._incubating.attributes import (
28+
server_attributes as ServerAttributes,
29+
)
30+
from opentelemetry.util.genai.types import (
31+
InputMessage,
32+
MessagePart,
33+
OutputMessage,
34+
)
35+
from opentelemetry.util.types import AttributeValue
36+
37+
from .utils import (
38+
convert_content_to_parts,
39+
normalize_finish_reason,
40+
)
41+
42+
if TYPE_CHECKING:
43+
from collections.abc import Iterable, Mapping
44+
45+
import httpx
46+
from anthropic.resources.messages import Messages
47+
from anthropic.types import (
48+
Message,
49+
MessageParam,
50+
MetadataParam,
51+
TextBlockParam,
52+
ThinkingConfigParam,
53+
ToolChoiceParam,
54+
ToolUnionParam,
55+
Usage,
56+
)
57+
58+
59+
@dataclass
60+
class MessageRequestParams:
61+
model: str | None = None
62+
max_tokens: int | None = None
63+
temperature: float | None = None
64+
top_k: int | None = None
65+
top_p: float | None = None
66+
stop_sequences: Sequence[str] | None = None
67+
stream: bool | None = None
68+
messages: Iterable[MessageParam] | None = None
69+
system: str | Iterable[TextBlockParam] | None = None
70+
71+
72+
GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS = (
73+
"gen_ai.usage.cache_creation.input_tokens"
74+
)
75+
GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS = "gen_ai.usage.cache_read.input_tokens"
76+
77+
78+
@dataclass
79+
class UsageTokens:
80+
input_tokens: int | None = None
81+
output_tokens: int | None = None
82+
cache_creation_input_tokens: int | None = None
83+
cache_read_input_tokens: int | None = None
84+
85+
86+
def extract_usage_tokens(
87+
usage: Usage | MessageDeltaUsage | None,
88+
) -> UsageTokens:
89+
if usage is None:
90+
return UsageTokens()
91+
92+
input_tokens = usage.input_tokens
93+
output_tokens = usage.output_tokens
94+
cache_creation_input_tokens = usage.cache_creation_input_tokens
95+
cache_read_input_tokens = usage.cache_read_input_tokens
96+
97+
if (
98+
input_tokens is None
99+
and cache_creation_input_tokens is None
100+
and cache_read_input_tokens is None
101+
):
102+
total_input_tokens = None
103+
else:
104+
total_input_tokens = (
105+
(input_tokens or 0)
106+
+ (cache_creation_input_tokens or 0)
107+
+ (cache_read_input_tokens or 0)
108+
)
109+
110+
return UsageTokens(
111+
input_tokens=total_input_tokens,
112+
output_tokens=output_tokens,
113+
cache_creation_input_tokens=cache_creation_input_tokens,
114+
cache_read_input_tokens=cache_read_input_tokens,
115+
)
116+
117+
118+
def get_input_messages(
119+
messages: Iterable[MessageParam] | None,
120+
) -> list[InputMessage]:
121+
if messages is None:
122+
return []
123+
result: list[InputMessage] = []
124+
for message in messages:
125+
role = message["role"]
126+
parts = convert_content_to_parts(message["content"])
127+
result.append(InputMessage(role=role, parts=parts))
128+
return result
129+
130+
131+
def get_system_instruction(
132+
system: str | Iterable[TextBlockParam] | None,
133+
) -> list[MessagePart]:
134+
if system is None:
135+
return []
136+
return convert_content_to_parts(system)
137+
138+
139+
def get_output_messages_from_message(
140+
message: Message | None,
141+
) -> list[OutputMessage]:
142+
if message is None:
143+
return []
144+
145+
parts = convert_content_to_parts(message.content)
146+
finish_reason = normalize_finish_reason(message.stop_reason)
147+
return [
148+
OutputMessage(
149+
role=message.role,
150+
parts=parts,
151+
finish_reason=finish_reason or "",
152+
)
153+
]
154+
155+
156+
def extract_params( # pylint: disable=too-many-locals
157+
*,
158+
max_tokens: int | None = None,
159+
messages: Iterable[MessageParam] | None = None,
160+
model: str | None = None,
161+
metadata: MetadataParam | None = None,
162+
service_tier: str | None = None,
163+
stop_sequences: Sequence[str] | None = None,
164+
stream: bool | None = None,
165+
system: str | Iterable[TextBlockParam] | None = None,
166+
temperature: float | None = None,
167+
thinking: ThinkingConfigParam | None = None,
168+
tool_choice: ToolChoiceParam | None = None,
169+
tools: Iterable[ToolUnionParam] | None = None,
170+
top_k: int | None = None,
171+
top_p: float | None = None,
172+
extra_headers: Mapping[str, str] | None = None,
173+
extra_query: Mapping[str, object] | None = None,
174+
extra_body: object | None = None,
175+
timeout: float | httpx.Timeout | None = None,
176+
**_kwargs: object,
177+
) -> MessageRequestParams:
178+
return MessageRequestParams(
179+
model=model,
180+
max_tokens=max_tokens,
181+
temperature=temperature,
182+
top_p=top_p,
183+
top_k=top_k,
184+
stop_sequences=stop_sequences,
185+
stream=stream,
186+
messages=messages,
187+
system=system,
188+
)
189+
190+
191+
def _set_server_address_and_port(
192+
client_instance: "Messages",
193+
attributes: dict[str, AttributeValue | None],
194+
) -> None:
195+
base_url = client_instance._client.base_url
196+
host = base_url.host
197+
if host:
198+
attributes[ServerAttributes.SERVER_ADDRESS] = host
199+
200+
port = base_url.port
201+
if port and port != 443 and port > 0:
202+
attributes[ServerAttributes.SERVER_PORT] = port
203+
204+
205+
def get_llm_request_attributes(
206+
params: MessageRequestParams, client_instance: "Messages"
207+
) -> dict[str, AttributeValue]:
208+
attributes: dict[str, AttributeValue | None] = {
209+
GenAIAttributes.GEN_AI_OPERATION_NAME: GenAIAttributes.GenAiOperationNameValues.CHAT.value,
210+
GenAIAttributes.GEN_AI_SYSTEM: GenAIAttributes.GenAiSystemValues.ANTHROPIC.value, # pyright: ignore[reportDeprecated]
211+
GenAIAttributes.GEN_AI_REQUEST_MODEL: params.model,
212+
GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS: params.max_tokens,
213+
GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE: params.temperature,
214+
GenAIAttributes.GEN_AI_REQUEST_TOP_P: params.top_p,
215+
GenAIAttributes.GEN_AI_REQUEST_TOP_K: params.top_k,
216+
GenAIAttributes.GEN_AI_REQUEST_STOP_SEQUENCES: params.stop_sequences,
217+
}
218+
_set_server_address_and_port(client_instance, attributes)
219+
return {k: v for k, v in attributes.items() if v is not None}

0 commit comments

Comments
 (0)