Skip to content

Conversation

sampan-s-nayak
Copy link
Contributor

@sampan-s-nayak sampan-s-nayak commented Aug 20, 2025

Why are these changes needed?

This is the second out of two pr's for supporting sending events from aggregator to GCS. this pr builds on top of #55780 to now support publishing ray events to GCS from aggregator.

Related issue number

NA

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Note

Adds GCS publishing alongside HTTP for aggregator events, introduces task metadata buffering, and centralizes event filtering/config with updated tests and metrics.

  • Aggregator Agent:
    • Add optional GCS publishing path (PUBLISH_EVENTS_TO_GCS) alongside HTTP; wire RayEventPublisher for ray_gcs and http_service and run both.
    • Integrate TaskMetadataBuffer to accumulate task_events_metadata (dropped task attempts) and include in GCS publishes.
    • Remove in-agent event filtering; delegate filtering to publisher clients.
  • Publisher Clients:
    • Add AsyncGCSPublisherClient to send RayEventsData to GCS with filtering (GCS_EXPOSABLE_EVENT_TYPES).
    • Refactor AsyncHttpPublisherClient to self-manage exposable HTTP event types (HTTP_EXPOSABLE_EVENT_TYPES) and remove external filter fn.
    • Introduce common PublishBatch (now carries task_events_metadata) and shared exposable-event logic.
  • Configs:
    • Add HTTP/GCS-specific exposable event settings; new env var RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISHER_HTTP_ENDPOINT_EXPOSABLE_EVENT_TYPES.
  • Metrics:
    • Per-consumer tags now use names http_service and ray_gcs; tests assert consumer-scoped counters.
  • Tests:
    • New tests for GCS-only and dual publish, GCS filtering behavior, and TaskMetadataBuffer.
    • Update existing tests to new env vars and consumer names; extend metrics tests for GCS.

Written by Cursor Bugbot for commit 47b524c. This will update automatically on new commits. Configure here.

@sampan-s-nayak sampan-s-nayak requested a review from a team as a code owner August 20, 2025 13:59
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds support for publishing Ray events from the aggregator to GCS. The changes include a new GCSPublisher, a TaskMetadataBuffer to handle task-related metadata, and associated configurations, metrics, and tests. The implementation is solid, but I've found a few areas for improvement. There's a high-severity bug in TaskMetadataBuffer where a method has inconsistent return types, which could lead to runtime errors. I've also pointed out a couple of medium-severity issues in GCSPublisher related to type hint correctness and overly broad exception handling. Addressing these points will improve the robustness and maintainability of the new functionality.

Signed-off-by: sampan <[email protected]>
@can-anyscale can-anyscale self-requested a review August 20, 2025 17:28
@edoakes
Copy link
Collaborator

edoakes commented Aug 20, 2025

will review after other is merged

@ray-gardener ray-gardener bot added core Issues that should be addressed in Ray Core observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling labels Aug 20, 2025
Signed-off-by: sampan <[email protected]>
@sampan-s-nayak sampan-s-nayak marked this pull request as draft August 21, 2025 07:20
if PUBLISH_EVENTS_TO_GCS:
logger.info("Publishing events to GCS is enabled")
self._event_processing_enabled = True
self._async_gcs_channel = create_gcs_channel(self.gcs_address, aio=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use create_gcs_channel which use python grpcio. Use GcsClient from cython instead.

Signed-off-by: sampan <[email protected]>
):
self._buffer_maxlen = max(
max_buffer_size - 1, 1
) # -1 to account for the current batch
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Buffer Capacity Miscalculation

The _buffer_maxlen calculation max(max_buffer_size - 1, 1) causes the buffer to exceed its intended total capacity. When max_buffer_size is 1, the buffer can hold two entries (one in the deque, one in the current batch), instead of the configured one.

Fix in Cursor Fix in Web

@sampan-s-nayak sampan-s-nayak requested a review from jjyao October 7, 2025 16:55
Copy link
Contributor

@can-anyscale can-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM; just a few nits

DEFAULT_HTTP_EXPOSABLE_EVENT_TYPES = (
"TASK_DEFINITION_EVENT,TASK_EXECUTION_EVENT,"
"ACTOR_TASK_DEFINITION_EVENT,ACTOR_TASK_EXECUTION_EVENT,"
"ACTOR_TASK_EXECUTION_EVENT,"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: changes that are unrelated to this PR? It's fine for now but let's try to put unrelated changes in different PRs to minimize the PR's size. That'll help speed up review and code merging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we no longer have a separate event for ACTOR_TASK_DEFINITION_EVENT, so just removing it. should not have any impact

for task_event in reply.events_by_task:
if task_event.task_info.name.lower() == unique_task_name.lower():
return task_event
task = list_tasks(filters=[("name", "=", unique_task_name)])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, thanks

# Max number of threads for the thread pool executor handling CPU intensive tasks
THREAD_POOL_EXECUTOR_MAX_WORKERS = ray_constants.env_integer(
f"{env_var_prefix}_THREAD_POOL_EXECUTOR_MAX_WORKERS", 1
f"{env_var_prefix}_THREAD_POOL_EXECUTOR_MAX_WORKERS", 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we changing this? will it potentially cause regressions in some release tests ...; ideally let's put it in a different PR if it's not critical for the correctness of this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me revert this, I did this to handle the scenario where both http and gcs publishers are enabled. in this case we would need two threads for maximum concurrency as the http publisher may use 1 thread to ser-de JSON while gcs publisher also uses 1 thread to ser-de PROTO. but given the fact that we are unlikely to use both publishers in majority of the cases let me set restrict num threads to 1 by default

#############################################################
# GcsRpcClient methods
#############################################################
async def async_add_events(self, serialized_request: bytes, timeout_s=None, executor=None) -> int:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly, this is fine for this PR but should be in another self-contained PR (this PR now is quite large for me to go through every thing in my brain context window ;))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added this to the current pr because i recently found out that we dont want to use python grpc library instead use cython bindings. I agree, I should have created a pr to add these bindings first and then raise this (current) pr ontop of it. Will keep this in mind next time

CAddEventsRequest c_req
int64_t timeout_ms
fut = incremented_fut()
timeout_ms = round(1000 * timeout_s) if timeout_s else -1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does a -1 timeout mean it will hang forever? maybe use 30s as a default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah -1 is the same as having no timeout. I think we can keep this as -1 as this is the default behaviour (I see similar pattern being used by other python libraries, eg http request lib does not have a timeout by default and needs one to be configured explicitly). consumers of this api can override it with a more reasonable value.

if m.name == descriptor and m.labels[CONSUMER_TAG_KEY] == consumer_name
]
if not samples:
print("missing: ", descriptor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove

Copy link
Contributor Author

@sampan-s-nayak sampan-s-nayak Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for flagging this!

Signed-off-by: sampan <[email protected]>
move(c_req),
StatusPyCallback(convert_status, assign_and_decrement_fut, fut),
timeout_ms)
return await asyncio.wrap_future(fut)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Async Method Threading and Error Handling Issues

The async_add_events method has a critical threading bug: when an executor is used, the stack-allocated c_req C++ object is accessed from a different thread. This cross-thread access to stack memory can lead to memory corruption or crashes. Separately, the method's error handling is inconsistent, raising a ValueError for parse errors while its signature implies an int return.

Fix in Cursor Fix in Web

Copy link
Contributor

@can-anyscale can-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edoakes
Copy link
Collaborator

edoakes commented Oct 10, 2025

Prior to merging, shall we benchmark this against the baseline of directly sending events from core worker?

Comment on lines 690 to 691
#############################################################
# GcsRpcClient methods
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#############################################################
# GcsRpcClient methods
#############################################################
# GcsRpcClient methods

///
/// TODO: Extend this macro to support other callback types defined in
/// gcs_callback_types.h
#define VOID_GCS_RPC_CLIENT_METHOD_WITH_STATUS_CALLBACK(SERVICE_NAMESPACE, \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to define a new macro here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was added to support gcs::StatusCallback which is required to add the cython bindings for this API. just saw #55781 (comment) . will remove this macro and use the accessor class instead of the rpc client in the cython binding

return await asyncio.wrap_future(fut)

with nogil:
self.inner.get().GetGcsRpcClient().AddEvents(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a AsyncAddEvents to the TaskInfoAccessor and use it here.

@sampan-s-nayak
Copy link
Contributor Author

Prior to merging, shall we benchmark this against the baseline of directly sending events from core worker?

yes I am working on this by running some release tests, seeing some segmentation faults which I am trying to debug

move(c_req),
StatusPyCallback(convert_status, assign_and_decrement_fut, fut),
timeout_ms)
return await asyncio.wrap_future(fut)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Error Handling Inconsistency in async_add_events

The async_add_events method handles protobuf parsing errors by raising a ValueError, but gRPC errors return an integer status code. This inconsistency makes error handling unpredictable for callers and conflicts with the -> int return type annotation.

Fix in Cursor Fix in Web

sampan added 2 commits October 13, 2025 07:59
Signed-off-by: sampan <[email protected]>
Signed-off-by: sampan <[email protected]>
is_publish_successful=False,
num_events_published=0,
num_events_filtered_out=0,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: GCS RPC vs HTTP Status Code Comparison Error

The AsyncGCSTaskEventsPublisherClient.publish method incorrectly compares the GCS RPC status code with an HTTP status code, which can misinterpret GCS publishing outcomes. Additionally, on any publishing failure, the returned PublishStats incorrectly sets num_events_filtered_out to 0, losing accurate filtering metrics.

Fix in Cursor Fix in Web

Signed-off-by: sampan <[email protected]>
"env_vars": {
# Enable both publishers
"RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISH_EVENTS_TO_GCS": "True",
"RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISH_EVENTS_TO_EXTERNAL_HTTP_SVC": "True",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Incorrect Environment Variable Causes Test Failures

The tests use an incorrect environment variable name (RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISH_EVENTS_TO_EXTERNAL_HTTP_SVC instead of RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISH_EVENTS_TO_EXTERNAL_HTTP_SERVICE) to control HTTP event publishing. This prevents the HTTP publisher from being enabled or disabled as intended, leading to inaccurate test results.

Additional Locations (1)

Fix in Cursor Fix in Web

Signed-off-by: sampan <[email protected]>
@sampan-s-nayak sampan-s-nayak requested a review from jjyao October 13, 2025 14:15
Signed-off-by: Sampan S Nayak <[email protected]>
{
"env_vars": {
# Disable HTTP publisher to test GCS filtering in isolation
"RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISH_EVENTS_TO_EXTERNAL_HTTP_SVC": "False",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Ray Dashboard HTTP Publisher Typo

The environment variable RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISH_EVENTS_TO_EXTERNAL_HTTP_SVC has a typo, using _SVC instead of _SERVICE. This prevents the HTTP publisher from being disabled as intended, which could affect tests relying on its isolation.

Fix in Cursor Fix in Web

):
self._buffer_maxlen = max(
max_buffer_size - 1, 1
) # -1 to account for the current batch
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Buffer Capacity Miscalculation

The _buffer_maxlen calculation max(max_buffer_size - 1, 1) incorrectly determines the buffer's capacity. It misaccounts for _current_metadata_batch, leading to an actual total capacity that doesn't align with the max_buffer_size expectation, especially for small values.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants