Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
validate_route_prefix,
)
from ray.serve.api import ASGIAppReplicaWrapper
from ray.serve.config import AutoscalingConfig
from ray.serve.config import AutoscalingConfig, AutoscalingPolicy
from ray.serve.exceptions import RayServeException
from ray.serve.generated.serve_pb2 import (
ApplicationStatus as ApplicationStatusProto,
Expand Down Expand Up @@ -332,6 +332,16 @@ def recover_target_state_from_checkpoint(
if checkpoint_data.deployment_infos is not None:
self._route_prefix = self._check_routes(checkpoint_data.deployment_infos)

# Restore app-level autoscaling policy from checkpoint
if (
checkpoint_data.config
and checkpoint_data.config.autoscaling_policy is not None
):
self._autoscaling_state_manager.register_application(
self._name,
AutoscalingPolicy(**checkpoint_data.config.autoscaling_policy),
)

def _set_target_state(
self,
deployment_infos: Optional[Dict[str, DeploymentInfo]],
Expand Down Expand Up @@ -438,7 +448,6 @@ def is_deleted(self) -> bool:

def should_autoscale(self) -> bool:
"""Determine if autoscaling should be enabled for the application."""

return self._autoscaling_state_manager.should_autoscale_application(self._name)

def autoscale(self) -> bool:
Expand Down Expand Up @@ -876,6 +885,17 @@ def update(self) -> Tuple[bool, bool]:
self._build_app_task_info.target_capacity_direction
),
)
# Handling the case where the user turns off/turns on app-level autoscaling policy,
# between app deployment.
if self._target_state.config.autoscaling_policy is not None:
self._autoscaling_state_manager.register_application(
self._name,
AutoscalingPolicy(
**self._target_state.config.autoscaling_policy
),
)
else:
self._autoscaling_state_manager.deregister_application(self._name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have a lingering registration if the same app_name was built with BuildAppStatus.SUCCEEDED the first time, but failed to build the second time around?

elif task_status == BuildAppStatus.FAILED:
self._update_status(ApplicationStatus.DEPLOY_FAILED, msg)

Expand All @@ -891,7 +911,10 @@ def update(self) -> Tuple[bool, bool]:

# Check if app is ready to be deleted
if self._target_state.deleting:
return self.is_deleted(), target_state_changed
is_deleted = self.is_deleted()
if is_deleted:
self._autoscaling_state_manager.deregister_application(self._name)
return is_deleted, target_state_changed
return False, target_state_changed

def get_checkpoint_data(self) -> ApplicationTargetState:
Expand Down
109 changes: 92 additions & 17 deletions python/ray/serve/_private/autoscaling_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
merge_timeseries_dicts,
)
from ray.serve._private.utils import get_capacity_adjusted_num_replicas
from ray.serve.config import AutoscalingContext
from ray.serve.config import AutoscalingContext, AutoscalingPolicy

logger = logging.getLogger(SERVE_LOGGER_NAME)

Expand Down Expand Up @@ -577,11 +577,29 @@ def __init__(
self._deployment_autoscaling_states: Dict[
DeploymentID, DeploymentAutoscalingState
] = {}
self._policy: Optional[
Callable[
[Dict[DeploymentID, AutoscalingContext]],
Tuple[Dict[DeploymentID, int], Optional[Dict[str, Dict]]],
]
] = None
self._policy_state: Optional[Dict[str, Any]] = None

@property
def deployments(self):
return self._deployment_autoscaling_states.keys()

def register(
self,
autoscaling_policy: AutoscalingPolicy,
):
"""Register or update application-level autoscaling config and deployments."""
self._policy = autoscaling_policy.get_policy()
self._policy_state = {}

def has_policy(self) -> bool:
return self._policy is not None

def register_deployment(
self,
deployment_id: DeploymentID,
Expand All @@ -599,14 +617,13 @@ def register_deployment(
curr_target_num_replicas,
)

def deregister_deployment(self, deployment_id: DeploymentID) -> int:
def deregister_deployment(self, deployment_id: DeploymentID):
if deployment_id not in self._deployment_autoscaling_states:
logger.warning(
f"Cannot deregister autoscaling state for deployment {deployment_id} because it is not registered"
)
return len(self._deployment_autoscaling_states)
return
self._deployment_autoscaling_states.pop(deployment_id)
return len(self._deployment_autoscaling_states)

def should_autoscale_deployment(self, deployment_id: DeploymentID):
return deployment_id in self._deployment_autoscaling_states
Expand All @@ -620,15 +637,43 @@ def get_decision_num_replicas(
Decide scaling for all deployments in this application by calling
each deployment's autoscaling policy.
"""
return {
deployment_id: deployment_autoscaling_state.get_decision_num_replicas(
curr_target_num_replicas=deployment_to_target_num_replicas[
deployment_id
],
_skip_bound_check=_skip_bound_check,
)
for deployment_id, deployment_autoscaling_state in self._deployment_autoscaling_states.items()
}
if self.has_policy():
# Using app-level policy
autoscaling_contexts = {
deployment_id: state.get_autoscaling_context(
deployment_to_target_num_replicas[deployment_id]
)
for deployment_id, state in self._deployment_autoscaling_states.items()
}

# Policy returns {deployment_name -> decision}
decisions, self._policy_state = self._policy(autoscaling_contexts)

assert (
type(decisions) is dict
), "Autoscaling policy must return a dictionary of deployment_name -> decision_num_replicas"

return {
deployment_id: (
self._deployment_autoscaling_states[deployment_id].apply_bounds(
num_replicas
)
if not _skip_bound_check
else num_replicas
)
for deployment_id, num_replicas in decisions.items()
}
else:
# Using deployment-level policy
return {
deployment_id: deployment_autoscaling_state.get_decision_num_replicas(
curr_target_num_replicas=deployment_to_target_num_replicas[
deployment_id
],
_skip_bound_check=_skip_bound_check,
)
for deployment_id, deployment_autoscaling_state in self._deployment_autoscaling_states.items()
}

def update_running_replica_ids(
self, deployment_id: DeploymentID, running_replicas: List[ReplicaID]
Expand Down Expand Up @@ -714,6 +759,7 @@ def register_deployment(
app_state = self._app_autoscaling_states.setdefault(
app_name, ApplicationAutoscalingState(app_name)
)
logger.info(f"Registering autoscaling state for deployment {deployment_id}")
return app_state.register_deployment(
deployment_id, info, curr_target_num_replicas
)
Expand All @@ -722,10 +768,39 @@ def deregister_deployment(self, deployment_id: DeploymentID):
"""Remove deployment from tracking."""
app_state = self._app_autoscaling_states.get(deployment_id.app_name)
if app_state:
num_deployments = app_state.deregister_deployment(deployment_id)
if num_deployments == 0:
# Clean up the app_name entry if no deployments are left
del self._app_autoscaling_states[deployment_id.app_name]
logger.info(
f"Deregistering autoscaling state for deployment {deployment_id}"
)
app_state.deregister_deployment(deployment_id)

def register_application(
self,
app_name: ApplicationName,
autoscaling_policy: AutoscalingPolicy,
):
app_state = self._app_autoscaling_states.setdefault(
app_name, ApplicationAutoscalingState(app_name)
)
logger.info(f"Registering autoscaling state for application {app_name}")
app_state.register(autoscaling_policy)

def deregister_application(self, app_name: ApplicationName):
"""Remove application from tracking."""
if app_name in self._app_autoscaling_states:
logger.info(f"Deregistering autoscaling state for application {app_name}")
self._app_autoscaling_states.pop(app_name, None)
else:
logger.warning(
f"Cannot deregister autoscaling state for application {app_name} because it is not registered"
)
self._policy = None
self._policy_state = None

def application_has_policy(self, app_name: ApplicationName) -> bool:
return (
app_name in self._app_autoscaling_states
and self._app_autoscaling_states[app_name].has_policy()
)

def get_decision_num_replicas(
self,
Expand Down
10 changes: 10 additions & 0 deletions python/ray/serve/_private/deploy_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,21 @@ def get_app_code_version(app_config: ServeApplicationSchema) -> str:
Returns: a hash of the import path and (application level) runtime env representing
the code version of the application.
"""
deployment_autoscaling_policies = [
deployment_config.autoscaling_config.get("policy", None)
for deployment_config in app_config.deployments
if isinstance(deployment_config.autoscaling_config, dict)
]
encoded = json.dumps(
{
"import_path": app_config.import_path,
"runtime_env": app_config.runtime_env,
"args": app_config.args,
# NOTE: trigger a change in the code version when
# application level autoscaling policy is changed or
# any one of the deployment level autoscaling policy is changed
"autoscaling_policy": app_config.autoscaling_policy,
"deployment_autoscaling_policies": deployment_autoscaling_policies,
},
sort_keys=True,
).encode("utf-8")
Expand Down
Loading