-
Notifications
You must be signed in to change notification settings - Fork 631
[RayService] Support Incremental Zero-Downtime Upgrades #3166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
8f9a396
to
486f98b
Compare
I've now added unit tests and one basic e2e test for the incremental upgrade feature so this should be good to start reviewing. In addition to the unit tests, here's some instructions for manually testing this feature in your cluster.
I'll put more comments with manual test results and add more e2e test cases, but this should be good to start reviewing/iterating on to get merge-ready before the v1.4 release. |
Tried to test this manually and not seeing Gateway reconcile with this log line:
Do I need to set |
No it should be called automatically when |
83f265f
to
17f7aa4
Compare
I'm running into some issues now with the allowed ports/protocols for listeners with different Gateway controllers (e.g. the GKE controller is pretty restrictive). I'm working now to figure out how to send traffic from the Serve service -> to the Gateway -> to the active and pending RayCluster head services through the HTTPRoute. An alternative would be just to have users directly send traffic to the Gateway which would be set to HTTP and port |
Discussed with Ryan offline, there's a validation in the GKE gateway controller that disallows port 8000 for Serve. But this validation will be removed soon. For now we will test with allowed ports like port 80 and change it back to 8000 before merging |
I moved the e2e test to it's own folder since it's an experimental feature and shouldn't be part of the pre-submit tests yet. |
@ryanaoleary can you resolve all the merge conflcits? I can do some testing on this branch once the conflcits are resolved |
adc6236
to
7694114
Compare
All the conflicts have been resolved, this is the image I'm currently using for testing: us-docker.pkg.dev/ryanaoleary-gke-dev/kuberay/kuberay:latest |
Fixed a
Gateway:
HTTPRoute:
The behavior that |
const ( | ||
// During upgrade, IncrementalUpgrade strategy will create an upgraded cluster to gradually scale | ||
// and migrate traffic to using Gateway API. | ||
IncrementalUpgrade RayServiceUpgradeType = "IncrementalUpgrade" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe too late to change this, but wondering if RollingUpgrade
be a more appropriate name? I assume most people are more familiar with this term. WDYT @ryanaoleary @kevin85421 @MortalHappiness
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(not blocking this PR, we can cahnge it during alpha phase)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Late to reply to this, but I have no strong preference either way. IncrementalUpgrade
is what was used in the feature request and REP so that's why I stuck with it, but if there's a preference from any KubeRay maintainers or users I'm down to go through and change the feature name / all the related variable names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @rueian for sharing opinion.
I think RollingUpgrade
is more a more straight forward name for me too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc: @kevin85421 since from offline discussion you seemed to have a preference against using RollingUpgrade
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevin85421 what do you think about ClusterUpgrade
and ClusterUpgradeOptions
? I prefer to keep the upgrade term generic as the exact behavior could be changed in the future.
@Future-Outlier was also wondering about the history of why we called it "incremental" upgrades.
Just wondering if there is an update on this PR. Will it make it into a KubeRay 1.4.x or 1.5 release? |
This PR is targeted for KubeRay v1.5, it still needs review and I'll prioritize iterating and resolving comments to get it merged. |
Signed-off-by: Ryan O'Leary <[email protected]> Update go mod dependencies for gateway v1 Signed-off-by: Ryan O'Leary <[email protected]> Add reconcile Gateway and HTTPRoute Signed-off-by: Ryan O'Leary <[email protected]> Add TargetCapacity and TrafficRoutedPercent to RayServiceStatus Signed-off-by: Ryan O'Leary <[email protected]> Add controller logic initial commit Signed-off-by: Ryan O'Leary <[email protected]> Add IncrementalUpgrade check to ShouldUpdate Signed-off-by: Ryan O'Leary <[email protected]> Update controller logic to reconcile incremental upgrade Signed-off-by: Ryan O'Leary <[email protected]> TrafficRoutedPercent should not set default value Signed-off-by: Ryan O'Leary <[email protected]> Remove test changes to TPU manifest Signed-off-by: Ryan O'Leary <[email protected]> Move helper function to utils Signed-off-by: Ryan O'Leary <[email protected]> Fix lint Signed-off-by: Ryan O'Leary <[email protected]> Fix field alignment Signed-off-by: Ryan O'Leary <[email protected]> Fix bad merge Signed-off-by: Ryan O'Leary <[email protected]> Fix CRDs and add validation test case Signed-off-by: Ryan O'Leary <[email protected]> Test create HTTPRoute and create Gateway Signed-off-by: Ryan O'Leary <[email protected]> Add reconcile tests for Gateway and HTTPRoute Signed-off-by: Ryan O'Leary <[email protected]> Fix lint Signed-off-by: Ryan O'Leary <[email protected]> Add tests for util functions and fix golangci-lint Signed-off-by: Ryan O'Leary <[email protected]> Add basic e2e test case Signed-off-by: Ryan O'Leary <[email protected]> Fix GetGatewayListeners logic and test Signed-off-by: Ryan O'Leary <[email protected]> Add gatewayv1 scheme to util runtime Signed-off-by: Ryan O'Leary <[email protected]> Check if IncrementalUpgrade is enabled before checking Gateway Signed-off-by: Ryan O'Leary <[email protected]> Fix reconcile logic for Gateway and HTTPRoute Signed-off-by: Ryan O'Leary <[email protected]> Add feature gate Signed-off-by: Ryan O'Leary <[email protected]> Always create Gateway and HTTPRoute for IncrementalUpgrade Signed-off-by: Ryan O'Leary <[email protected]> Fix target_capacity reonciliation logic Signed-off-by: Ryan O'Leary <[email protected]> Add additional unit tests Signed-off-by: Ryan O'Leary <[email protected]> Move e2e test and add another unit test Signed-off-by: Ryan O'Leary <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
Co-authored-by: Han-Ju Chen (Future-Outlier) <[email protected]> Signed-off-by: Ryan O'Leary <[email protected]>
Co-authored-by: Han-Ju Chen (Future-Outlier) <[email protected]> Signed-off-by: Ryan O'Leary <[email protected]>
Co-authored-by: Han-Ju Chen (Future-Outlier) <[email protected]> Signed-off-by: Ryan O'Leary <[email protected]>
Co-authored-by: Han-Ju Chen (Future-Outlier) <[email protected]> Signed-off-by: Ryan O'Leary <[email protected]>
Co-authored-by: Han-Ju Chen (Future-Outlier) <[email protected]> Signed-off-by: Ryan O'Leary <[email protected]>
Hi, @ryanaoleary I think it would be safer (this can make sure no requests drop) and continuously send requests to the gateway. You can also DM me on ray slack before this is merged if you think that’s a faster way to coordinate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
during upgrading process, sometime will have scenario we drop request, we have to figure out wht.
--------------------------------------------------
Statistics after 200 requests: Success: 195, Failure: 5
--------------------------------------------------
Request #201: Success! Status: 200, Response: 10
Request #202: Success! Status: 200, Response: 10
Request #203: Failed! Status: 503, Response: no healthy upstream
Request #204: Success! Status: 200, Response: 10
Request #205: Success! Status: 200, Response: 10
Request #206: Success! Status: 200, Response: 10
Request #207: Success! Status: 200, Response: 10
Request #208: Failed! Status: 503, Response: no healthy upstream
Request #209: Failed! Status: 503, Response: no healthy upstream
Request #210: Success! Status: 200, Response: 10
Request #211: Success! Status: 200, Response: 10
Request #212: Success! Status: 200, Response: 10
Request #213: Success! Status: 200, Response: 10
Request #214: Success! Status: 200, Response: 10
Request #215: Failed! Status: 503, Response: no healthy upstream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my python script for you to test serving request while upgrading locally.
import subprocess
import time
import sys
def run_kubectl_command(cmd, check=True):
"""Execute kubectl command and return result"""
try:
# Execute command with check=True, if command fails it will throw an exception
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=check,
encoding='utf-8' # Explicitly specify encoding
)
return result
except subprocess.CalledProcessError as e:
print(f"Error running command: {' '.join(cmd)}")
print(f"Stderr: {e.stderr}")
return None
except FileNotFoundError:
print("Error: 'kubectl' command not found. Please ensure it is installed and in your PATH.")
sys.exit(1)
def ensure_curl_pod_is_running(pod_name="curl", namespace="default"):
"""Check if Pod exists and is running, if not, create it"""
# Check Pod status
check_cmd = ["kubectl", "get", "pod", pod_name, "-n", namespace, "-o", "jsonpath='{.status.phase}'"]
try:
result = subprocess.run(check_cmd, capture_output=True, text=True, check=True, encoding='utf-8')
# Remove single quotes
status = result.stdout.strip("'")
if status == "Running":
print(f"Pod '{pod_name}' is already running.")
return True
else:
print(f"Pod '{pod_name}' exists but is not running (Status: {status}). Deleting it to recreate.")
delete_cmd = ["kubectl", "delete", "pod", pod_name, "-n", namespace, "--grace-period=0"]
run_kubectl_command(delete_cmd, check=False)
time.sleep(5) # Wait for deletion to complete
except subprocess.CalledProcessError:
# get pod failed, means pod doesn't exist
print(f"Pod '{pod_name}' not found. Creating it...")
# Create Pod and execute a command that never ends
run_cmd = [
"kubectl", "run", pod_name,
"--image=radial/busyboxplus:curl",
"--restart=Never",
"-n", namespace,
"--", "sh", "-c", "tail -f /dev/null" # Keep Pod running permanently
]
if not run_kubectl_command(run_cmd):
print(f"Failed to start pod '{pod_name}'.")
return False
# Wait for Pod to enter Running state
print(f"Waiting for pod '{pod_name}' to be ready...")
wait_cmd = ["kubectl", "wait", "--for=condition=Ready", f"pod/{pod_name}", "-n", namespace, "--timeout=120s"]
if not run_kubectl_command(wait_cmd):
print("Pod did not become ready in time.")
return False
print(f"Pod '{pod_name}' is ready!")
return True
def send_curl_request(pod_name, request_count, namespace="default"):
"""Execute curl request in Pod"""
# Update URL according to your requirements
url = "http://192.168.8.201/fruit"
headers = {
"Content-Type": "application/json",
"Host": "rayservice-incremental-upgrade.default.svc.cluster.local"
}
data = '["MANGO", 2]'
# Build curl command to execute inside Pod
curl_cmd = [
"kubectl", "exec", pod_name, "-n", namespace, "--",
"curl",
"-s", # Silent mode
"-w", "\\n%{http_code}", # Print HTTP status code on the last line of output
"-X", "POST",
"-H", f"Content-Type: {headers['Content-Type']}",
"-H", f"Host: {headers['Host']}",
"-d", data,
url
]
# print("curl_cmd: ", curl_cmd)
result = run_kubectl_command(curl_cmd, check=False)
if not result:
print(f"Request #{request_count}: Failed to execute curl command in pod.")
return False
# Parse output from kubectl exec
output = result.stdout.strip()
if '\n' not in output:
print(f"Request #{request_count}: Unexpected response format: {output}")
if result.stderr:
print(f"Stderr: {result.stderr.strip()}")
return False
# Split response body and status code
parts = output.rsplit('\n', 1)
response_body = parts[0]
status_code_str = parts[1]
try:
status_code = int(status_code_str)
# Check status code and response content
if status_code == 200:
print(f"Request #{request_count}: Success! Status: {status_code}, Response: {response_body}")
return True
else:
print(f"Request #{request_count}: Failed! Status: {status_code}, Response: {response_body}")
return False
except ValueError:
print(f"Request #{request_count}: Invalid status code '{status_code_str}' in response.")
return False
def main():
pod_name = "curl"
# Step 1: Ensure curl pod exists and is running
if not ensure_curl_pod_is_running(pod_name):
print("Exiting due to failure in preparing the curl pod.")
return
request_count = 0
success_count = 0
failure_count = 0
print("\nStarting to send requests... Press Ctrl+C to stop.")
try:
# Step 2: Enter loop, send request every 0.1 seconds
while True:
request_count += 1
if send_curl_request(pod_name, request_count):
success_count += 1
else:
failure_count += 1
# Every 100 requests, display statistics once
if request_count % 100 == 0:
print("-" * 50)
print(f"Statistics after {request_count} requests: Success: {success_count}, Failure: {failure_count}")
print("-" * 50)
time.sleep(0.1)
except KeyboardInterrupt:
print("\nScript interrupted by user.")
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
finally:
print("\nScript finished.")
print(f"Final Statistics: Total: {request_count}, Success: {success_count}, Failure: {failure_count}")
# Note: The script will not automatically delete the Pod after completion, you can manually execute the following command to clean up:
# kubectl delete pod curl -n default
if __name__ == "__main__":
main()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upgrade scenario

after 300 request (1 request per 0.1 second), I failed 7, and I think this is not acceptable.
no upgrade scenario

I am not sure if this is ray serve's bug or kuberay's bug, need discussion.
cc @ryanaoleary @andrewsykim @kevin85421 @rueian
Hi, @joshdevins are you interested in testing this PR before the release? |
Sounds good, thank you!! I'll work on testing this more today before the community meeting and I'll message if I have questions / have any updates. I'm currently looking into the two issues you mentioned (#3166 (review) and #3166 (comment)) since I think they may be related. |
Hi @ryanaoleary, I'm just checking in on this, as the branch cut date is approaching around 10/20. I wanted to let you know that I'm happy to help debug, brainstorm solutions, or work on the implementation with you if that would be helpful. Please let me know! |
Signed-off-by: Ryan O'Leary <[email protected]>
Sounds good, thank you!! I really appreciate the help. I just pushed f785c6e which should fix the two main issues brought up in the comments.
I'll do some testing later today with the load test python script you provided to ensure no requests are dropped, but it is passing the e2e test consistently for me. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems correct!!
I am testing more!
but I am really excited about this progress
2025-10-10.15-54-37.mp4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @ryanaoleary, here is an async script.
-
I think this is better than the synchronous one. I am thinking that you can probably do batch requests to a production cluster with GPU workload and see what will happen when these requests try to hit the limit while upgrading. This is the stress test I have in mind.
-
And I am wondering if it is possible to test this PR with GPU using Locust? Following Ray's official documentation. Since I also want to know the CPU and resource usage.
https://docs.ray.io/en/latest/serve/autoscaling-guide.html#basic-example
- When using the script below, I triggered the incremental upgrade and got the response shown below.
(I think this is flaky — you have to try multiple times. I believe that on average it fails about once every three tries.)
I’m not sure whether this problem is related to incremental upgrade or not.

import asyncio
import subprocess
import time
import sys
import json
from typing import Tuple, Optional
async def run_kubectl_command_async(cmd, check=True):
"""Execute kubectl command asynchronously and return result"""
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if check and process.returncode != 0:
print(f"Error running command: {' '.join(cmd)}")
print(f"Stderr: {stderr.decode('utf-8')}")
return None
return subprocess.CompletedProcess(
args=cmd,
returncode=process.returncode,
stdout=stdout.decode('utf-8'),
stderr=stderr.decode('utf-8')
)
except FileNotFoundError:
print("Error: 'kubectl' command not found. Please ensure it is installed and in your PATH.")
sys.exit(1)
async def ensure_curl_pod_is_running(pod_name="curl", namespace="default"):
"""Check if Pod exists and is running, if not, create it"""
# Check Pod status
check_cmd = ["kubectl", "get", "pod", pod_name, "-n", namespace, "-o", "jsonpath='{.status.phase}'"]
try:
result = await run_kubectl_command_async(check_cmd, check=True)
if result:
# Remove single quotes
status = result.stdout.strip("'")
if status == "Running":
print(f"Pod '{pod_name}' is already running.")
return True
else:
print(f"Pod '{pod_name}' exists but is not running (Status: {status}). Deleting it to recreate.")
delete_cmd = ["kubectl", "delete", "pod", pod_name, "-n", namespace, "--grace-period=0"]
await run_kubectl_command_async(delete_cmd, check=False)
await asyncio.sleep(5) # Wait for deletion to complete
except Exception:
# get pod failed, means pod doesn't exist
print(f"Pod '{pod_name}' not found. Creating it...")
# Create Pod and execute a command that never ends
run_cmd = [
"kubectl", "run", pod_name,
"--image=radial/busyboxplus:curl",
"--restart=Never",
"-n", namespace,
"--", "sh", "-c", "tail -f /dev/null" # Keep Pod running permanently
]
result = await run_kubectl_command_async(run_cmd)
if not result:
print(f"Failed to start pod '{pod_name}'.")
return False
# Wait for Pod to enter Running state
print(f"Waiting for pod '{pod_name}' to be ready...")
wait_cmd = ["kubectl", "wait", "--for=condition=Ready", f"pod/{pod_name}", "-n", namespace, "--timeout=120s"]
result = await run_kubectl_command_async(wait_cmd)
if not result:
print("Pod did not become ready in time.")
return False
print(f"Pod '{pod_name}' is ready!")
return True
async def send_curl_request(pod_name, request_count, namespace="default") -> Tuple[bool, int, str]:
"""Execute curl request in Pod and return (success, status_code, response_body)"""
# Update URL according to your requirements
url = "http://192.168.8.201/fruit"
headers = {
"Content-Type": "application/json",
"Host": "rayservice-incremental-upgrade.default.svc.cluster.local"
}
data = '["MANGO", 2]'
# Build curl command to execute inside Pod
curl_cmd = [
"kubectl", "exec", pod_name, "-n", namespace, "--",
"curl",
"-s", # Silent mode
"-w", "\\n%{http_code}", # Print HTTP status code on the last line of output
"-X", "POST",
"-H", f"Content-Type: {headers['Content-Type']}",
"-H", f"Host: {headers['Host']}",
"-H", "Connection: close",
"-d", data,
url
]
result = await run_kubectl_command_async(curl_cmd, check=False)
if not result:
print(f"Request #{request_count}: Failed to execute curl command in pod.")
return False, 0, ""
# Parse output from kubectl exec
output = result.stdout.strip()
if '\n' not in output:
print(f"Request #{request_count}: Unexpected response format: {output}")
if result.stderr:
print(f"Stderr: {result.stderr.strip()}")
return False, 0, output
# Split response body and status code
parts = output.rsplit('\n', 1)
response_body = parts[0]
status_code_str = parts[1]
try:
status_code = int(status_code_str)
# Check status code and response content
if status_code == 200:
# print(f"Request #{request_count}: Success! Status: {status_code}, Response: {response_body}")
return True, status_code, response_body
else:
print(f"Request #{request_count}: Failed! Status: {status_code}, Response: {response_body}")
return False, status_code, response_body
except ValueError:
print(f"Request #{request_count}: Invalid status code '{status_code_str}' in response.")
return False, 0, response_body
async def send_requests_batch(pod_name, start_count, batch_size=100, namespace="default"):
"""Send a batch of requests concurrently"""
tasks = []
for i in range(batch_size):
request_count = start_count + i
task = send_curl_request(pod_name, request_count, namespace)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
success_count = 0
failure_count = 0
failed_requests = []
for i, result in enumerate(results):
request_count = start_count + i
if isinstance(result, Exception):
print(f"Request #{request_count}: Exception occurred: {result}")
failure_count += 1
failed_requests.append({
'request_count': request_count,
'status_code': 0,
'response_body': str(result)
})
else:
success, status_code, response_body = result
if success:
success_count += 1
else:
failure_count += 1
failed_requests.append({
'request_count': request_count,
'status_code': status_code,
'response_body': response_body
})
return success_count, failure_count, failed_requests
async def main():
pod_name = "curl"
# Step 1: Ensure curl pod exists and is running
if not await ensure_curl_pod_is_running(pod_name):
print("Exiting due to failure in preparing the curl pod.")
return
total_request_count = 0
total_success_count = 0
total_failure_count = 0
all_failed_requests = []
print("\nStarting to send requests continuously... Press Ctrl+C to stop.")
print("Using async batch processing for faster requests...")
try:
# Step 2: Enter continuous loop, send requests in batches
batch_size = 10 # Number of concurrent requests per batch
# delay_between_batches = 0.05 # Delay between batches in seconds
while True:
success_count, failure_count, failed_requests = await send_requests_batch(
pod_name, total_request_count + 1, batch_size
)
total_request_count += batch_size
total_success_count += success_count
total_failure_count += failure_count
all_failed_requests.extend(failed_requests)
# Every 10 batches (200 requests), display statistics once
if total_request_count % (batch_size * 10) == 0:
print("-" * 50)
print(f"Statistics after {total_request_count} requests: Success: {total_success_count}, Failure: {total_failure_count}")
print("-" * 50)
if total_failure_count > 0:
print("FAILED REQUESTS DETAILS:")
print("=" * 60)
for failed_req in all_failed_requests:
print(f"Request #{failed_req['request_count']}: Status: {failed_req['status_code']}, Response: {failed_req['response_body']}")
print("=" * 60)
exit(1)
# Wait before next batch
# await asyncio.sleep(delay_between_batches)
except KeyboardInterrupt:
print("\nScript interrupted by user.")
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
finally:
print("\nScript finished.")
print(f"Final Statistics: Total: {total_request_count}, Success: {total_success_count}, Failure: {total_failure_count}")
# Print all failed requests with details
if all_failed_requests:
print("\n" + "=" * 60)
print("FAILED REQUESTS DETAILS:")
print("=" * 60)
for failed_req in all_failed_requests:
print(f"Request #{failed_req['request_count']}: Status: {failed_req['status_code']}, Response: {failed_req['response_body']}")
print("=" * 60)
# Note: The script will not automatically delete the Pod after completion, you can manually execute the following command to clean up:
# kubectl delete pod curl -n default
if __name__ == "__main__":
asyncio.run(main())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here are the todo list of this PR:
- if we are already upgrading from old cluster(1st cluster) to new cluster(2nd cluster), if now user
kubectl apply
a new cluster(3rd cluster), we should reject the request for 3rd cluster , right? do we handle this now? - resolve my comments for better readability
(or reply if my suggestion doesn't make sense, I will take a look)
#3166 (comment)
#3166 (comment) - rename IncrementalUpgradeOptions to ClusterUpgradeOptions
#3166 (comment) - do more load test, I think we should try locust
- after 1,2,3,4 are done, make e2e test work in buildkite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need more time to check if this change is correct or not, will give you update next monday or Tuesday, thank you.
Signed-off-by: Ryan O'Leary <[email protected]>
I'll work on 3. and 4. and update this PR with the changes/results by Monday night.
|
I should have some GPU quota, I'll try that test and update this PR with results. For the flaky test, I think that could be unrelated to the upgrade / the head Pod could be hitting memory or CPU limits. I'll try replicating as well to further debug but I suspect it's not unique to incremental upgrades. |
|
||
// GetGatewayListenersForRayService constructs the default HTTP listener for a RayService Gateway. | ||
func GetGatewayListenersForRayService(rayServiceInstance *rayv1.RayService) []gwv1.Listener { | ||
hostname := fmt.Sprintf("%s.%s.svc.cluster.local", rayServiceInstance.Name, rayServiceInstance.Namespace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does .cluster.local
always work? IIRC, it is customizable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC hostname
can be set to whatever we want as long as it's unique and matches what the users expect. It's not based on an actual service name or anything.
Is there somewhere in the RayCluster spec where users can specify a hostname? I can make this customizable based on that if so, I just didn't see a relevant field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, .cluster.local
let me think it is tight with a k8s service FQDN.
Shouldn't we just leave the hostname unspecified for accepting all requests?
Hi, @ryanaoleary
I will do it too, see you next week and thank you for the hard work. |
|
todo: |
if incrementalUpgradeUpdate { | ||
if err := r.reconcileServeTargetCapacity(ctx, rayServiceInstance, rayClusterInstance, rayDashboardClient); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @ryanaoleary
Since isIncrementalUpgradeInProgress
has included incrementalUpgradeUpdate
already,
can we delete the statement if incrementalUpgradeUpdate
?
isIncrementalUpgradeInProgress := utils.IsIncrementalUpgradeEnabled(&rayServiceInstance.Spec) && | ||
rayServiceInstance.Status.PendingServiceStatus.RayClusterName != "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain what's the difference between this statement and
isIncrementalUpgradeInProgress := utils.IsIncrementalUpgradeEnabled(&rayServiceInstance.Spec) && meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.UpgradeInProgress))
?
Can we be consistent? if not, why?
promotedPendingCluster := false | ||
if headSvc != nil && serveSvc != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change the name to isPendingClusterPromoted
?
if utils.IsIncrementalUpgradeEnabled(&rayServiceInstance.Spec) && meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.UpgradeInProgress)) { | ||
// An incremental upgrade is complete when the active cluster has 0% capacity and the pending cluster has | ||
// 100% of the traffic. We can't promote the pending cluster until traffic has been fully migrated. | ||
if pendingCluster != nil && | ||
ptr.Deref(rayServiceInstance.Status.ActiveServiceStatus.TargetCapacity, -1) == 0 && | ||
ptr.Deref(rayServiceInstance.Status.PendingServiceStatus.TrafficRoutedPercent, -1) == 100 { | ||
|
||
logger.Info("Promoting pending cluster to active: Incremental upgrade complete.", | ||
"oldCluster", rayServiceInstance.Status.ActiveServiceStatus.RayClusterName, | ||
"newCluster", rayServiceInstance.Status.PendingServiceStatus.RayClusterName) | ||
|
||
rayServiceInstance.Status.ActiveServiceStatus = rayServiceInstance.Status.PendingServiceStatus | ||
rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{} | ||
promotedPendingCluster = true | ||
} | ||
} | ||
isPendingClusterServing = clusterName == pendingClusterName | ||
|
||
// If services point to a different cluster than the active one, promote pending to active | ||
logger.Info("calculateStatus", "clusterSvcPointingTo", clusterName, "pendingClusterName", pendingClusterName, "activeClusterName", activeClusterName) | ||
if activeClusterName != clusterName { | ||
logger.Info("Promoting pending cluster to active", | ||
"oldCluster", rayServiceInstance.Status.ActiveServiceStatus.RayClusterName, | ||
"newCluster", clusterName) | ||
rayServiceInstance.Status.ActiveServiceStatus = rayServiceInstance.Status.PendingServiceStatus | ||
rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{} | ||
if !utils.IsIncrementalUpgradeEnabled(&rayServiceInstance.Spec) || !promotedPendingCluster { | ||
// Promote the pending cluster to the active cluster if both RayService's head and serve services | ||
// have already pointed to the pending cluster. | ||
clusterName := utils.GetRayClusterNameFromService(headSvc) | ||
if clusterName != utils.GetRayClusterNameFromService(serveSvc) { | ||
panic("headSvc and serveSvc are not pointing to the same cluster") | ||
} | ||
// Verify cluster name matches either pending or active cluster | ||
if clusterName != pendingClusterName && clusterName != activeClusterName { | ||
panic("clusterName is not equal to pendingCluster or activeCluster") | ||
} | ||
isPendingClusterServing = clusterName == pendingClusterName | ||
|
||
// If services point to a different cluster than the active one, promote pending to active | ||
logger.Info("calculateStatus", "clusterSvcPointingTo", clusterName, "pendingClusterName", pendingClusterName, "activeClusterName", activeClusterName) | ||
if activeClusterName != clusterName { | ||
logger.Info("Promoting pending cluster to active", | ||
"oldCluster", rayServiceInstance.Status.ActiveServiceStatus.RayClusterName, | ||
"newCluster", clusterName) | ||
rayServiceInstance.Status.ActiveServiceStatus = rayServiceInstance.Status.PendingServiceStatus | ||
rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do something like this??
If we don't do this, it will be hard for developer to understand the code.
you can change these helper functions's name, just want to give a direction.
// Check if incremental upgrade is enabled in the RayService specification
if utils.IsIncrementalUpgradeEnabled(&rayServiceInstance.Spec) {
logger.Info("Processing incremental upgrade strategy")
// Check if incremental upgrade is currently in progress
if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.UpgradeInProgress)) {
// Incremental upgrade is in progress
logger.Info("Incremental upgrade is in progress")
// Check if incremental upgrade completion conditions are met
if isIncrementalUpgradeComplete(rayServiceInstance, pendingCluster) {
// All conditions met: active cluster has 0% capacity and
// pending cluster has 100% traffic
logger.Info("Incremental upgrade completed, promoting pending cluster")
// Execute the promotion logic
promotePendingClusterToActive(rayServiceInstance)
isPendingClusterPromoted = true
// No need to execute traditional logic after successful promotion
return
} else {
logger.Info("Incremental upgrade not complete, waiting for completion",
"activeTargetCapacity", ptr.Deref(rayServiceInstance.Status.ActiveServiceStatus.TargetCapacity, -1),
"pendingTrafficRoutedPercent", ptr.Deref(rayServiceInstance.Status.PendingServiceStatus.TrafficRoutedPercent, -1))
// Continue with traditional logic as fallback during upgrade
executeTraditionalUpgradeLogic(headSvc, serveSvc, rayServiceInstance)
}
} else {
// Incremental upgrade is enabled but not started yet
logger.Info("Incremental upgrade not started, using traditional logic")
executeTraditionalUpgradeLogic(headSvc, serveSvc, rayServiceInstance)
}
} else {
// Incremental upgrade is not enabled, use traditional Blue/Green strategy
logger.Info("Processing traditional Blue/Green upgrade strategy")
executeTraditionalUpgradeLogic(headSvc, serveSvc, rayServiceInstance)
}
func isIncrementalUpgradeComplete(rayServiceInstance *rayv1.RayService, pendingCluster *rayv1.RayCluster) bool {
return pendingCluster != nil &&
ptr.Deref(rayServiceInstance.Status.ActiveServiceStatus.TargetCapacity, -1) == 0 &&
ptr.Deref(rayServiceInstance.Status.PendingServiceStatus.TrafficRoutedPercent, -1) == 100
}
func promotePendingClusterToActive(rayServiceInstance *rayv1.RayService) {
logger.Info("Promoting pending cluster to active: Incremental upgrade complete.",
"oldCluster", rayServiceInstance.Status.ActiveServiceStatus.RayClusterName,
"newCluster", rayServiceInstance.Status.PendingServiceStatus.RayClusterName)
// Perform the actual status swap
rayServiceInstance.Status.ActiveServiceStatus = rayServiceInstance.Status.PendingServiceStatus
rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{}
}
func executeTraditionalUpgradeLogic(headSvc, serveSvc *corev1.Service, rayServiceInstance *rayv1.RayService) {
logger.Info("Executing traditional Blue/Green upgrade logic")
// Validate that both services exist
if headSvc == nil || serveSvc == nil {
logger.Info("Services not ready, skipping traditional upgrade logic")
return
}
// Step 1: Service Consistency Check
// Ensure head and serve services point to the same cluster
if err := validateServiceConsistency(headSvc, serveSvc); err != nil {
logger.Error(err, "Service consistency validation failed")
return
}
// Step 2: Cluster Switching Logic
// Determine which cluster the services are currently pointing to
clusterName := utils.GetRayClusterNameFromService(headSvc)
pendingClusterName := rayServiceInstance.Status.PendingServiceStatus.RayClusterName
activeClusterName := rayServiceInstance.Status.ActiveServiceStatus.RayClusterName
// Update the serving status
isPendingClusterServing = clusterName == pendingClusterName
// Step 3: Execute Cluster Promotion (if needed)
// If services are pointing to a different cluster than the current active one
if activeClusterName != clusterName {
logger.Info("Promoting pending cluster to active (Traditional upgrade)",
"oldCluster", activeClusterName,
"newCluster", clusterName)
// Perform the cluster status swap
rayServiceInstance.Status.ActiveServiceStatus = rayServiceInstance.Status.PendingServiceStatus
rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{}
}
}
func validateServiceConsistency(headSvc, serveSvc *corev1.Service) error {
clusterName := utils.GetRayClusterNameFromService(headSvc)
serveSvcClusterName := utils.GetRayClusterNameFromService(serveSvc)
// Check if both services point to the same cluster
if clusterName != serveSvcClusterName {
return fmt.Errorf("service inconsistency detected: head service points to %s but serve service points to %s",
clusterName, serveSvcClusterName)
}
return nil
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @ryanaoleary
I tested this PR using locust on my local env.
I think we can use locust to test this feature, but don't have to do this in this PR.
https://github.com/ray-project/kuberay/blob/master/ray-operator/test/e2erayservice/rayservice_ha_test.go
-
my locust test proof
#3166 (comment) -
rename
IncrementalUpgrade
toClusterUpgrade
#3166 (comment) -
and do the refactor I mentioned here
#3166 (review) -
Could you explain this commit? I read it yesterday, and I want to make sure my understanding matches yours.
f785c6e
after these are done, I think we can get this merged.
cc @rueian @andrewsykim
to align our understanding.
Why are these changes needed?
This PR implements an alpha version of the RayService Incremental Upgrade REP.
The RayService controller logic to reconcile a RayService during an incremental upgrade is as follows:
IncrementalUpgradeOptions
and accept/reject the RayService CR accordinglyreconcileGateway
- on the first call this should create a new Gateway CR and subsequent calls will update theListeners
as necessary based on any changes to the RayService.SpecreconcileHTTPRoute
- on the first call this should create a HTTPRoute CR with twobackendRef
s, one pointing to the old cluster and one to the pending cluster withweight
s 100 and 0 accordingly. Every subsequent call toreconcileHTTPRoute
will update the HTTPRoute by changing theweight
of eachbackendRef
byStepSizePercent
until theweight
associated with each cluster equals theTargetCapacity
associated with that cluster. ThebackendRef
weight is exposed through the RayService Status fieldTrafficRoutedPercent
. Theweight
is only changed if it's been at leastIntervalSeconds
sinceRayService.Status.LastTrafficMigratedTime
, otherwise the controller waits until the next iteration and checks again.TrafficRoutedPercent == TargetCapacity
, if so thetarget_capacity
can be updated for one of the clusters.reconcileServeTargetCapacity
. If the totaltarget_capacity
of both Serve configs is less than or equal to 100%, the pending cluster'starget_capacity
can be safely scaled up byMaxSurgePercent
. If the totaltarget_capacity
is greater than 100%, the active clustertarget_capacity
can be decreased byMaxSurgePercent
.TargetCapacity
andTrafficRoutedPercent
of the pending RayService instance RayCluster equal 100%, the upgrade is complete.Related issue number
#3209
Checks