Skip to content

Add @metric decorator for SDK-side metric evaluation#1436

Open
akwasigroch wants to merge 10 commits intomainfrom
feat/code-metrics
Open

Add @metric decorator for SDK-side metric evaluation#1436
akwasigroch wants to merge 10 commits intomainfrom
feat/code-metrics

Conversation

@akwasigroch
Copy link
Collaborator

Purpose

Add support for user-defined metrics that execute on the SDK client side, using the same connector infrastructure as @endpoint. This allows users to define custom evaluation metrics in Python that are registered, synced to the backend DB, and invoked remotely during test evaluation.

What Changed

SDK

  • New @metric decorator with strict signature validation (required: input, output; optional: expected_output, context)
  • MetricRegistry for storing metric callables and metadata
  • MetricMetadata, ExecuteMetricMessage, MetricResultMessage schemas
  • ConnectorManager handles execute_metric messages and sends metric_result responses
  • TestExecutor.execute_metric() with input filtering based on accepted params
  • RegisterMessage extended with metrics list
  • Top-level export: from rhesis.sdk import metric

Backend

  • Connector schemas mirrored: MetricMetadata, ExecuteMetricMessage, MetricResultMessage
  • metric_sync.py: syncs SDK metrics to DB as Metric rows with backend_type=sdk
  • RegistrationHandler calls metric sync on registration
  • ConnectionManager: stores metric registries, handles metric_result, adds send_metric_request / send_and_await_metric_result, and _forward_metric_to_sdk for RPC
  • SDKRpcClient.send_and_await_metric_result() for Redis RPC from Celery workers
  • MetricEvaluator: splits metrics by backend, routes backend=sdk through connector RPC, merges results

Tests (35 new tests)

  • test_metric_registry.py — MetricRegistry CRUD and metadata generation
  • test_metric_decorator.py — signature validation, constants, DisabledClient behavior
  • test_executor_metric.py — sync/async execution, input filtering, error handling, result normalization

Testing

cd sdk && uv run pytest ../tests/sdk/connector/test_metric_registry.py ../tests/sdk/connector/test_metric_decorator.py ../tests/sdk/connector/test_executor_metric.py -v

End-to-end: run playground/run_metric.py with a local backend, then trigger a test run with SDK metrics from the platform.

…metrics

Add support for registering and executing metrics on the SDK client side
via the connector, mirroring the @endpoint pattern. Metrics are registered
with a strict contract (required: input, output; optional: expected_output,
context) and communicated over WebSocket using new execute_metric /
metric_result message types.
Add backend support for executing @metric-decorated SDK functions
during test evaluation. Includes connector schemas, DB sync,
RPC routing, and MetricEvaluator integration for backend=sdk.
Cover MetricRegistry CRUD, @Metric signature validation with
required/optional/invalid params, DisabledClient no-op behavior,
and TestExecutor.execute_metric for sync/async/error paths.
…old operators

Added the ability to specify a threshold operator in the MetricEvaluator class, allowing for more flexible score evaluations based on the configured operator.
Copy link

@peqy peqy bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added SDK-side @metric + backend RPC routing looks cohesive overall.

Main things to tighten:

  • SDK should always respond with a metric_result even on handler exceptions (avoid backend timeouts).
  • Backend SDK metric evaluation currently blocks async contexts by spinning threads + asyncio.run; consider an async pipeline or less blocking integration.
  • Backend ConnectionManager should guard _metric_results against unbounded growth / late arrivals.
  • Registration handler should use parsed reg_msg.metrics instead of raw message.get("metrics") for consistent validation.

Found 4 issues (0 critical, 4 improvements).

except Exception as e:
logger.error(f"❌ Failed to publish RPC response for {test_run_id}: {e}", exc_info=True)

def _resolve_metric_result(self, metric_run_id: str, result: Dict[str, Any]) -> None:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvement: metric results are stored in _metric_results without any guard/TTL like _cancelled_tests has for tests. If a metric_result arrives for a run that is never awaited/cleaned up (or after timeout), this dict can grow unbounded.

Fix: mirror the cancelled-test mechanism for metrics (e.g., track timed-out metric_run_ids and ignore late arrivals, plus periodic trimming), or add a timestamp/TTL-based cleanup.

Added functionality to track cancelled or timed-out metric runs using an OrderedDict. This prevents late results from being stored and includes a cleanup mechanism to manage memory usage by limiting the number of cancelled metrics retained. Updated the cleanup_metric_result method to mark metrics as cancelled and added a new method for periodic cleanup of old cancelled metrics.
Copy link

@peqy peqy bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this is a solid, well-scoped feature addition (decorator + registry + connector RPC + evaluator routing). A couple edge cases could bite in practice:

  • SDK @metric wrapper currently forces kwargs-only calls.
  • Backend metric sync/registration should handle the “empty metrics list” case to correctly clear/mark-inactive SDK metrics.

Found 3 issues (0 critical, 3 improvements).

if inspect.iscoroutinefunction(func):

@wraps(func)
async def wrapper(**kwargs: Any) -> Any:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvement: @metric wraps the function as wrapper(**kwargs) which makes the decorated callable kwargs-only (positional calls like toxicity("in","out") will fail). Other decorators here preserve *args, **kwargs.

Fix: consider wrapper(*args, **kwargs) and use inspect.signature(func).bind_partial(*args, **kwargs) to filter to accepted params while preserving normal calling conventions.

logger.info(f"sync_function_endpoints returned: {stats}")

# Sync SDK metrics
if metrics_data:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvement: gating sync on if metrics_data: means we never mark previously-registered SDK metrics inactive when the SDK sends an empty metrics list (e.g., user removes all metrics). Also message.get("metrics", []) can’t distinguish “old SDK that doesn’t send metrics” vs “new SDK intentionally sent none”.

Fix: check key presence (if "metrics" in message:) and call sync_sdk_metrics(...) even when the list is empty.

try:
reg_msg = RegisterMessage(**message)
self.register_functions(project_id, environment, reg_msg.functions)
if reg_msg.metrics:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvement: if reg_msg.metrics: self.register_metrics(...) leaves stale metrics in _metric_registries when an updated registration includes an empty metrics list (or intends to clear).

Fix: if the registration message includes the metrics field, call register_metrics(project_id, environment, reg_msg.metrics) even when it’s empty (or explicitly clear the existing entry).

…ding

Slim down the WebSocket router to authenticate only (via FastAPI
Depends), deferring project/environment binding to the register
message. Introduce immutable WebSocketConnectionContext, extract
message loop, and bridge registration into legacy dicts for
backward compatibility.
…ager

Restructure internal data model into two layers: transport
(connection_id -> WebSocket) and routing (project:env -> connection_id).
Add project authorization at registration time using the immutable
connection context. Replace direct _connections dict access with
has_local_route() public method.
…aiting

Replace busy-loop polling (sleep 100ms) in send_and_await_result and
send_and_await_metric_result with asyncio.Event-based notification.
Results now wake waiters instantly via event.set(), reducing latency
and eliminating wasted CPU cycles.
Add connection-level key (ws:conn:{id}) set at connect time for
connection-scoped dispatch. Remove redundant ws:connection: keys.
Consolidate per-project heartbeats into a single per-connection
heartbeat that refreshes both connection and project routing keys.
…tion

Replace project_id/environment/connection_id params in MetricEvaluator
with an injected sdk_metric_sender callable. Callers build the sender
closure with their routing context; the evaluator has zero knowledge
of connections, projects, or RPC clients. Add dual dispatch support
in RPC client and connection manager (project:env and connection_id).
Copy link

@peqy peqy bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid end-to-end feature (SDK decorator/registry + backend routing/RPC + sync) and the new routing split (connection_id vs project:env) is a good direction.

Main things to address are around stale-connection cleanup for connection-scoped metric RPC, potential event-key collisions between tests/metrics, and the current sync-to-async bridging in MetricEvaluator being heavy/blocking in async contexts.

f"Connection {conn_id} not found",
)
return
await self._forward_metric_to_sdk(request_id, conn_id, websocket, name, inputs)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvement: In connection-scoped RPC dispatch (connection_id present), we pass conn_id into _forward_metric_to_sdk() as project_env_key.

On send failure _forward_metric_to_sdk() calls _cleanup_stale_routing(project_env_key) which will try to delete ws:routing:{conn_id} and won’t clear the real ws:conn:{conn_id} / heartbeat / _connections entry.

Fix: split cleanup paths for project-scoped vs connection-scoped dispatch (e.g., call disconnect_by_connection_id(conn_id) or add a _cleanup_stale_connection(conn_id) that removes ws:conn:{conn_id} + stops heartbeat).

"""
# Check if test run was cancelled/timed out - don't store late results
event = asyncio.Event()
self._result_events[metric_run_id] = event
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvement: _result_events is shared for both test_run_id and metric_run_id keys. If a metric_run_id ever collides with a test_run_id, one waiter can clobber the other.

Fix: namespace the keys (e.g., test:{id} / metric:{id}) or maintain separate dicts for test vs metric events.


coro = self._sdk_metric_sender(metric_run_id, class_name, inputs)

if loop and loop.is_running():
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvement: _evaluate_sdk_metrics() runs async RPC via asyncio.run(...), and when already in an event loop it spins up a ThreadPoolExecutor and blocks waiting for pool.submit(asyncio.run, coro).result(...).

This works around asyncio.run() limitations, but it’s heavy (new thread + new loop per metric) and can still block an async caller’s event loop thread.

Fix: consider providing an async evaluation path for SDK metrics (e.g., evaluate_async or make SDK evaluation concurrent via asyncio.gather when a loop is running), or move the “run coroutine in thread” helper to a shared utility used consistently (there’s a similar pattern elsewhere).

type_name=SDK_BACKEND_TYPE_NAME,
type_value=SDK_BACKEND_TYPE_VALUE,
organization_id=organization_id,
user_id=user_id,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvement: sync_sdk_metrics is async but does only synchronous SQLAlchemy work (no awaits). This can be misleading for callers and suggests concurrency that isn’t there.

Fix: either make it a normal def (and keep call sites sync) or add a brief comment why it must be async (e.g., interface symmetry).

await websocket.close(code=1008, reason=error_msg)
return
except Exception as e:
logger.error(f"WebSocket error for {context.connection_id}: {e}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvement: _message_loop wraps every message handling in with get_db_with_tenant_variables(...) as db:.

If this is high-traffic (ping/pong, results streaming), opening a DB session per message can be expensive.

Fix: if feasible, create one session per connection (or per N messages) and reuse it, or ensure handle_message paths that don’t need DB can avoid acquiring a session.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant