Skip to content

Conversation

Sparks0219
Copy link
Contributor

Why are these changes needed?

The issue with the current implementation of core worker HandleKillActor is that it won't send a reply when the RPC completes because the worker is dead. The application code from the GCS doesn't really care since it just logs the response if one is received, a response is only sent if the actor ID of the actor on the worker and in the RPC don't match, and the GCS will just log it and move on with its life.

Hence we can't differentiate in the case of a transient network failure whether there was a network issue, or the actor was successfully killed. What I think is the most straightforward approach is instead of the GCS directly calling core worker KillActor, we have the GCS talk to the raylet instead and call a new RPC KillLocalActor that in turn calls KillActor. Since the raylet that receives KillLocalActor is local to the worker that the actor is on, we're guaranteed to kill it at that point (either through using KillActor, or if it hangs falling back to SIGKILL).

Thus the main intuition is that the GCS now talks to the raylet, and this layer implements retries. Once the raylet receives the KillLocalActor request, it routes this to KillActor. This layer between the raylet and core worker does not have retries enabled because we can assume that RPCs between the local raylet and worker won't fail (same machine). We then check on the status of the worker after a while (5 seconds via kill_worker_timeout_milliseconds) and if it still hasn't been killed then we call DestroyWorker that in turn sends the SIGKILL.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run pre-commit jobs to lint the changes in this PR. (pre-commit setup)
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: joshlee <[email protected]>
Signed-off-by: joshlee <[email protected]>
@Sparks0219
Copy link
Contributor Author

Couple comments about my design/concerns I have:
1.) I decided to keep KillActor rather than have the Raylet use core worker Exit since I think it would get a bit bloated. From what I see they both call into shutdown coordinator, but juggle a different set of params provided by rpc and use this to set various fields for shutdown coordinator. Hence think it would just get messier if I just used Exit for KillLocalActor with not much benefit.
2.) From what I see, KillActor should already be idempotent because of the changes in d63f464 i.e we track if there's an ongoing shutdown request and subsequent ones are noops
3.) In the current behavior of KillActor, we don't really have any mitigations if in graceful shutdown it gets stuck. Hence I added a timer and an async_wait call that does a force kill after a certain amount of time (5s). This is a bit not nice because ideally if we get a DisconnectClient message, we should immediately cancel the timer but I don't think it's too important because the GCS isn't blocked waiting for the response.

What are your thoughts @codope ?

@Sparks0219 Sparks0219 added the go add ONLY when ready to merge, run all tests label Oct 11, 2025
@Sparks0219 Sparks0219 requested review from codope and dayshah October 11, 2025 23:29
@Sparks0219 Sparks0219 marked this pull request as ready for review October 11, 2025 23:30
@Sparks0219 Sparks0219 requested a review from a team as a code owner October 11, 2025 23:30
worker->rpc_client()->KillActor(
kill_actor_request,
[kill_actor_error, timer](const ray::Status &status, const rpc::KillActorReply &) {
*kill_actor_error = status;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how to do this a bit more elegantly, only possible case where KillActor responds is if we hit this error check at

if (intended_actor_id != worker_context_->GetCurrentActorID()) {
and I want to propagate this error to the callback below if cancellation happens

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo don't need to propagate back up to the gcs, since it throws it away

request.set_force_kill(force_kill);
auto actor_client = worker_client_pool_.GetOrConnect(actor->GetAddress());
auto actor_raylet_client =
raylet_client_pool_.GetOrConnectByAddress(actor->GetLocalRayletAddress());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Actor Termination Fails Due to Uninitialized Raylet Address

The local_raylet_address_ in GcsActor is only set when a worker lease is granted, not during construction or for actors loaded from GCS storage after a restart. This means NotifyCoreWorkerToKillActor can receive an uninitialized address for actors that haven't completed scheduling or were pre-existing, causing raylet_client_pool_.GetOrConnectByAddress() to fail and preventing actor termination.

Additional Locations (1)

Fix in Cursor Fix in Web

@ray-gardener ray-gardener bot added the core Issues that should be addressed in Ray Core label Oct 12, 2025
Comment on lines +63 to +70
const rpc::Address &GcsActor::GetLocalRayletAddress() const {
return local_raylet_address_;
}

void GcsActor::UpdateLocalRayletAddress(const rpc::Address &address) {
local_raylet_address_.CopyFrom(address);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit but usually cpp getters and setters look like this

Suggested change
const rpc::Address &GcsActor::GetLocalRayletAddress() const {
return local_raylet_address_;
}
void GcsActor::UpdateLocalRayletAddress(const rpc::Address &address) {
local_raylet_address_.CopyFrom(address);
}
const rpc::Address &GcsActor::LocalRayletAddress() const {
return local_raylet_address_;
}
rpc::Address &GcsActor::LocalRayletAddress() {
return local_raylet_address_;
}

actor_local_raylet_address.set_node_id(node->node_id());
actor_local_raylet_address.set_ip_address(node->node_manager_address());
actor_local_raylet_address.set_port(node->node_manager_port());
actor->UpdateLocalRayletAddress(actor_local_raylet_address);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on actor death in between restarting, there's probably a point where the actor doesn't have a local raylet.
The actor also doesn't have a local raylet at registration time + before creation completion

local_raylet_address should probably be an optional and we shouldn't make the rpc if it's nullopt

@@ -1220,6 +1220,52 @@ INSTANTIATE_TEST_SUITE_P(PinObjectIDsIdempotencyVariations,
PinObjectIDsIdempotencyTest,
testing::Bool());

TEST_F(NodeManagerTest, TestHandleKillLocalActorIdempotency) {
// Test that calling HandleKillLocalActor twice results in KillActor RPC being
// called twice on the core worker client. HandleKillLocalActor is a simple wrapper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should also test that we don't send the KillActor rpc if the raylet already knows the worker is dead

timer->async_wait([this, send_reply_callback, kill_actor_error, worker_id, timer](
const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
send_reply_callback(*kill_actor_error, nullptr, nullptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there seems to be a lot of work to propagate this status back up to the gcs even though the gcs just throws the status away


timer->expires_from_now(boost::posix_time::milliseconds(
RayConfig::instance().kill_worker_timeout_milliseconds()));
timer->async_wait([this, send_reply_callback, kill_actor_error, worker_id, timer](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think execute_after does some of this boiler plate for you

Also this is new functionality we're adding to kill actor right? In what cases will the worker survive after it got the kill actor req?

worker->rpc_client()->KillActor(
kill_actor_request,
[kill_actor_error, timer](const ray::Status &status, const rpc::KillActorReply &) {
*kill_actor_error = status;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo don't need to propagate back up to the gcs, since it throws it away

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants