Skip to content

Commit b06fc4e

Browse files
pymilvus-bothaorenfsaclaude
authored
[Backport 2.6] enhance: add channel_state and debug_error_string to gRPC error diagnostics (#3243) (#3249)
Backport of #3243 to `2.6`. Signed-off-by: haorenfsa <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: shaoyue <[email protected]> Co-authored-by: Claude Opus 4.5 <[email protected]>
1 parent 64de51d commit b06fc4e

File tree

2 files changed

+312
-9
lines changed

2 files changed

+312
-9
lines changed

pymilvus/decorators.py

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,50 @@
2020
LOGGER = logging.getLogger(__name__)
2121
WARNING_COLOR = "\033[93m{}\033[0m"
2222

23+
# gRPC connectivity state: int -> enum mapping
24+
_CONNECTIVITY_INT_TO_ENUM = {state.value[0]: state for state in grpc.ChannelConnectivity}
25+
26+
27+
def _get_rpc_error_info(e: grpc.RpcError, channel: grpc.Channel = None) -> str:
28+
"""Extract full error info from gRPC error including debug_error_string and channel state.
29+
30+
Args:
31+
e: The gRPC RpcError exception
32+
channel: Optional gRPC channel to get connectivity state from
33+
"""
34+
parts = [f"{e.code()}", e.details() or ""]
35+
36+
# Include channel connectivity state for better diagnostics
37+
# This helps distinguish between connection-level vs application-level timeouts
38+
if channel is not None:
39+
try:
40+
state = channel._channel.check_connectivity_state(False)
41+
# Handle both enum and integer return types
42+
if isinstance(state, int):
43+
state = _CONNECTIVITY_INT_TO_ENUM.get(state)
44+
state_name = state.name if state else str(state)
45+
parts.append(f"channel_state={state_name}")
46+
except Exception: # noqa: S110
47+
pass
48+
49+
# Append debug_error_string for TCP-level diagnostics
50+
if hasattr(e, "debug_error_string"):
51+
try:
52+
debug_str = e.debug_error_string()
53+
if debug_str:
54+
parts.append(f"debug={debug_str}")
55+
except Exception: # noqa: S110
56+
pass
57+
58+
return ", ".join(p for p in parts if p)
59+
60+
61+
def _try_get_channel(args: tuple) -> grpc.Channel:
62+
"""Try to get channel from the first argument (self) if it's a GrpcHandler."""
63+
if args and hasattr(args[0], "_channel"):
64+
return args[0]._channel
65+
return None
66+
2367

2468
def retry_on_schema_mismatch():
2569
"""
@@ -151,6 +195,7 @@ async def async_handler(*args, **kwargs):
151195
counter = 1
152196
back_off = initial_back_off
153197
start_time = time.time()
198+
channel = _try_get_channel(args)
154199

155200
def is_timeout(start_time: Optional[float] = None) -> bool:
156201
if retry_timeout is not None:
@@ -171,15 +216,14 @@ def is_timeout(start_time: Optional[float] = None) -> bool:
171216
raise e from e
172217
if is_timeout(start_time):
173218
raise MilvusException(
174-
e.code(), f"{to_msg}, message={e.details()}"
219+
e.code(), f"{to_msg}, {_get_rpc_error_info(e, channel)}"
175220
) from e
176221

177222
if counter > 3:
178-
retry_msg = (
223+
LOGGER.info(
179224
f"[{func.__name__}] retry:{counter}, cost: {back_off:.2f}s, "
180-
f"reason: <{e.__class__.__name__}: {e.code()}, {e.details()}>"
225+
f"reason: <{_get_rpc_error_info(e, channel)}>"
181226
)
182-
LOGGER.info(retry_msg)
183227

184228
await asyncio.sleep(back_off)
185229
back_off = min(back_off * back_off_multiplier, max_back_off)
@@ -225,6 +269,7 @@ def handler(*args, **kwargs):
225269
counter = 1
226270
back_off = initial_back_off
227271
start_time = time.time()
272+
channel = _try_get_channel(args)
228273

229274
def timeout(start_time: Optional[float] = None) -> bool:
230275
"""If timeout is valid, use timeout as the retry limits,
@@ -248,15 +293,15 @@ def timeout(start_time: Optional[float] = None) -> bool:
248293
if e.code() in IGNORE_RETRY_CODES:
249294
raise e from e
250295
if timeout(start_time):
251-
raise MilvusException(e.code, f"{to_msg}, message={e.details()}") from e
296+
raise MilvusException(
297+
e.code, f"{to_msg}, {_get_rpc_error_info(e, channel)}"
298+
) from e
252299

253300
if counter > 3:
254-
retry_msg = (
301+
LOGGER.info(
255302
f"[{func.__name__}] retry:{counter}, cost: {back_off:.2f}s, "
256-
f"reason: <{e.__class__.__name__}: {e.code()}, {e.details()}>"
303+
f"reason: <{_get_rpc_error_info(e, channel)}>"
257304
)
258-
# retry msg uses info level
259-
LOGGER.info(retry_msg)
260305

261306
time.sleep(back_off)
262307
back_off = min(back_off * back_off_multiplier, max_back_off)
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
"""
2+
Tests for gRPC error diagnostics: debug_error_string and channel_state.
3+
4+
Unit tests use mocks and run quickly.
5+
Integration tests require network access and are marked with @pytest.mark.integration.
6+
"""
7+
8+
from typing import Optional
9+
10+
import grpc
11+
import pytest
12+
from pymilvus.decorators import _CONNECTIVITY_INT_TO_ENUM, _get_rpc_error_info, _try_get_channel
13+
14+
15+
class MockRpcError(grpc.RpcError):
16+
"""Mock gRPC RpcError for unit testing."""
17+
18+
def __init__(
19+
self,
20+
code: grpc.StatusCode = grpc.StatusCode.UNAVAILABLE,
21+
details: str = "mock error details",
22+
debug_error_string: Optional[str] = None,
23+
):
24+
self._code = code
25+
self._details = details
26+
self._debug_error_string = debug_error_string
27+
28+
def code(self):
29+
return self._code
30+
31+
def details(self):
32+
return self._details
33+
34+
def debug_error_string(self):
35+
return self._debug_error_string
36+
37+
38+
class MockChannel:
39+
"""Mock gRPC channel for unit testing."""
40+
41+
def __init__(self, state: grpc.ChannelConnectivity = grpc.ChannelConnectivity.READY):
42+
self._channel = self
43+
self._state = state
44+
45+
def check_connectivity_state(self, try_to_connect: bool):
46+
return self._state # Return enum value like real gRPC
47+
48+
49+
class MockGrpcHandler:
50+
"""Mock GrpcHandler with _channel attribute."""
51+
52+
def __init__(self, channel_state: grpc.ChannelConnectivity = grpc.ChannelConnectivity.READY):
53+
self._channel = MockChannel(channel_state)
54+
55+
56+
class TestGetRpcErrorInfo:
57+
"""Unit tests for _get_rpc_error_info function."""
58+
59+
def test_basic_error_info(self):
60+
"""Test that basic error info includes code and details."""
61+
error = MockRpcError(
62+
code=grpc.StatusCode.UNAVAILABLE,
63+
details="connection refused",
64+
)
65+
result = _get_rpc_error_info(error)
66+
67+
assert "StatusCode.UNAVAILABLE" in result
68+
assert "connection refused" in result
69+
70+
def test_includes_debug_error_string(self):
71+
"""Test that debug_error_string is included when available."""
72+
error = MockRpcError(
73+
code=grpc.StatusCode.UNAVAILABLE,
74+
details="connection refused",
75+
debug_error_string='{"grpc_status":14,"grpc_message":"Connection refused"}',
76+
)
77+
result = _get_rpc_error_info(error)
78+
79+
assert "debug=" in result
80+
assert "Connection refused" in result
81+
82+
def test_handles_missing_debug_error_string(self):
83+
"""Test graceful handling when debug_error_string is None."""
84+
error = MockRpcError(
85+
code=grpc.StatusCode.UNAVAILABLE,
86+
details="some error",
87+
debug_error_string=None,
88+
)
89+
result = _get_rpc_error_info(error)
90+
91+
assert "StatusCode.UNAVAILABLE" in result
92+
assert "debug=" not in result
93+
94+
def test_includes_channel_state_when_provided(self):
95+
"""Test that channel_state is included when channel is provided."""
96+
error = MockRpcError(code=grpc.StatusCode.DEADLINE_EXCEEDED)
97+
channel = MockChannel(state=grpc.ChannelConnectivity.CONNECTING)
98+
99+
result = _get_rpc_error_info(error, channel)
100+
101+
assert "channel_state=CONNECTING" in result
102+
103+
def test_channel_state_ready(self):
104+
"""Test channel_state=READY for connected channel."""
105+
error = MockRpcError(code=grpc.StatusCode.DEADLINE_EXCEEDED)
106+
channel = MockChannel(state=grpc.ChannelConnectivity.READY)
107+
108+
result = _get_rpc_error_info(error, channel)
109+
110+
assert "channel_state=READY" in result
111+
112+
def test_channel_state_transient_failure(self):
113+
"""Test channel_state=TRANSIENT_FAILURE for failed connection."""
114+
error = MockRpcError(code=grpc.StatusCode.UNAVAILABLE)
115+
channel = MockChannel(state=grpc.ChannelConnectivity.TRANSIENT_FAILURE)
116+
117+
result = _get_rpc_error_info(error, channel)
118+
119+
assert "channel_state=TRANSIENT_FAILURE" in result
120+
121+
def test_no_channel_state_when_channel_is_none(self):
122+
"""Test that channel_state is not included when channel is None."""
123+
error = MockRpcError(code=grpc.StatusCode.UNAVAILABLE)
124+
125+
result = _get_rpc_error_info(error, channel=None)
126+
127+
assert "channel_state=" not in result
128+
129+
def test_full_error_info_format(self):
130+
"""Test complete error info with all components."""
131+
error = MockRpcError(
132+
code=grpc.StatusCode.DEADLINE_EXCEEDED,
133+
details="Deadline Exceeded",
134+
debug_error_string='{"grpc_status":4}',
135+
)
136+
channel = MockChannel(state=grpc.ChannelConnectivity.CONNECTING)
137+
138+
result = _get_rpc_error_info(error, channel)
139+
140+
assert "StatusCode.DEADLINE_EXCEEDED" in result
141+
assert "Deadline Exceeded" in result
142+
assert "channel_state=CONNECTING" in result
143+
assert "debug=" in result
144+
145+
146+
class TestTryGetChannel:
147+
"""Unit tests for _try_get_channel function."""
148+
149+
def test_extracts_channel_from_grpc_handler(self):
150+
"""Test that channel is extracted from GrpcHandler-like object."""
151+
handler = MockGrpcHandler(channel_state=grpc.ChannelConnectivity.READY)
152+
153+
channel = _try_get_channel((handler,))
154+
155+
assert channel is not None
156+
assert channel._state == grpc.ChannelConnectivity.READY
157+
158+
def test_returns_none_for_empty_args(self):
159+
"""Test that None is returned when args is empty."""
160+
channel = _try_get_channel(())
161+
162+
assert channel is None
163+
164+
def test_returns_none_when_no_channel_attribute(self):
165+
"""Test that None is returned when object has no _channel."""
166+
167+
class NoChannelObject:
168+
pass
169+
170+
channel = _try_get_channel((NoChannelObject(),))
171+
172+
assert channel is None
173+
174+
175+
class TestConnectivityStateMapping:
176+
"""Unit tests for connectivity state mapping."""
177+
178+
def test_all_states_have_mapping(self):
179+
"""Test that all connectivity states have int-to-enum mapping."""
180+
assert _CONNECTIVITY_INT_TO_ENUM[0] == grpc.ChannelConnectivity.IDLE
181+
assert _CONNECTIVITY_INT_TO_ENUM[1] == grpc.ChannelConnectivity.CONNECTING
182+
assert _CONNECTIVITY_INT_TO_ENUM[2] == grpc.ChannelConnectivity.READY
183+
assert _CONNECTIVITY_INT_TO_ENUM[3] == grpc.ChannelConnectivity.TRANSIENT_FAILURE
184+
assert _CONNECTIVITY_INT_TO_ENUM[4] == grpc.ChannelConnectivity.SHUTDOWN
185+
assert len(_CONNECTIVITY_INT_TO_ENUM) == 5
186+
187+
188+
@pytest.mark.integration
189+
class TestGrpcErrorDiagnosticsIntegration:
190+
"""
191+
Integration tests that make real gRPC connections.
192+
These tests require network access and may be slow.
193+
194+
Run with: pytest tests/test_grpc_error_diagnostics.py -m integration
195+
"""
196+
197+
def _test_connection(self, uri: str, timeout: float = 3.0):
198+
"""Helper to test a connection and return error info."""
199+
channel = None
200+
try:
201+
addr = uri.replace("http://", "").replace("https://", "")
202+
if uri.startswith("https://"):
203+
channel = grpc.secure_channel(
204+
addr,
205+
grpc.ssl_channel_credentials(),
206+
options=[("grpc.enable_retries", 0)],
207+
)
208+
else:
209+
channel = grpc.insecure_channel(addr, options=[("grpc.enable_retries", 0)])
210+
211+
method = channel.unary_unary(
212+
"/test/Method",
213+
request_serializer=lambda x: b"",
214+
response_deserializer=lambda x: x,
215+
)
216+
method(b"", timeout=timeout)
217+
except grpc.RpcError as e:
218+
error_info = _get_rpc_error_info(e, channel)
219+
return e, error_info
220+
else:
221+
return None, None # No error
222+
finally:
223+
if channel:
224+
channel.close()
225+
226+
def test_dns_failure_includes_debug_and_channel_state(self):
227+
"""Test DNS resolution failure includes diagnostic info."""
228+
error, error_info = self._test_connection("http://baddomain.invalid:19530")
229+
230+
assert error is not None
231+
assert error.code() == grpc.StatusCode.UNAVAILABLE
232+
assert "debug=" in error_info
233+
assert "channel_state=" in error_info
234+
assert "DNS" in error_info or "name" in error_info.lower()
235+
236+
def test_connection_refused_includes_debug_and_channel_state(self):
237+
"""Test connection refused includes diagnostic info."""
238+
error, error_info = self._test_connection("http://127.0.0.1:19999")
239+
240+
assert error is not None
241+
assert error.code() == grpc.StatusCode.UNAVAILABLE
242+
assert "debug=" in error_info
243+
assert "channel_state=TRANSIENT_FAILURE" in error_info
244+
assert "refused" in error_info.lower() or "connect" in error_info.lower()
245+
246+
def test_deadline_exceeded_shows_connecting_state(self):
247+
"""Test that deadline exceeded with unresponsive server shows CONNECTING state."""
248+
# 8.8.8.8:80 is likely to timeout (firewall drops packets)
249+
error, error_info = self._test_connection("http://8.8.8.8:80", timeout=3.0)
250+
251+
assert error is not None
252+
assert error.code() == grpc.StatusCode.DEADLINE_EXCEEDED
253+
assert "debug=" in error_info
254+
assert "channel_state=CONNECTING" in error_info
255+
256+
257+
if __name__ == "__main__":
258+
pytest.main([__file__, "-v"])

0 commit comments

Comments
 (0)