Skip to content

Commit 06b3204

Browse files
authored
[Feat][Router] Add configurable timeout_seconds for Kubernetes watchers (vllm-project#654)
* Add --k8s-timeout-seconds argument for Kubernetes watcher streams Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * Enhance Kubernetes service discovery by adding configurable timeout for watcher streams - Introduced parameter in and classes to allow dynamic timeout settings. - Updated relevant methods to utilize the new timeout configuration instead of hard-coded values. Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * Validate k8s-timeout-seconds argument to ensure it is greater than 0 in the parser. This change enhances error handling for Kubernetes service discovery configurations. Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * Refactor Kubernetes service discovery timeout parameters to use 'watcher_timeout_seconds' instead of 'timeout_seconds'. Updated related validation and argument parsing to reflect this change, enhancing clarity and configurability for watcher stream timeouts. Signed-off-by: Ifta Khairul Alam Adil <[email protected]> --------- Signed-off-by: Ifta Khairul Alam Adil <[email protected]>
1 parent 8a0cb98 commit 06b3204

File tree

3 files changed

+17
-6
lines changed

3 files changed

+17
-6
lines changed

src/vllm_router/app.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ def initialize_all(app: FastAPI, args):
164164
label_selector=args.k8s_label_selector,
165165
prefill_model_labels=args.prefill_model_labels,
166166
decode_model_labels=args.decode_model_labels,
167+
watcher_timeout_seconds=args.k8s_watcher_timeout_seconds,
167168
)
168169

169170
else:

src/vllm_router/parsers/parser.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ def validate_args(args):
9797
validate_static_model_types(args.static_model_types)
9898
if args.service_discovery == "k8s" and args.k8s_port is None:
9999
raise ValueError("K8s port must be provided when using K8s service discovery.")
100+
if args.k8s_watcher_timeout_seconds <= 0:
101+
raise ValueError("k8s-watcher-timeout-seconds must be greater than 0.")
100102
if args.routing_logic == "session" and args.session_key is None:
101103
raise ValueError(
102104
"Session key must be provided when using session routing logic."
@@ -188,6 +190,12 @@ def parse_args():
188190
default="",
189191
help="The label selector to filter vLLM pods when using K8s service discovery.",
190192
)
193+
parser.add_argument(
194+
"--k8s-watcher-timeout-seconds",
195+
type=int,
196+
default=30,
197+
help="Timeout in seconds for Kubernetes watcher streams (default: 30).",
198+
)
191199
parser.add_argument(
192200
"--routing-logic",
193201
type=str,

src/vllm_router/service_discovery.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,7 @@ def __init__(
350350
label_selector=None,
351351
prefill_model_labels: List[str] | None = None,
352352
decode_model_labels: List[str] | None = None,
353+
watcher_timeout_seconds: int = 30,
353354
):
354355
"""
355356
Initialize the Kubernetes service discovery module. This module
@@ -363,13 +364,15 @@ def __init__(
363364
namespace: the namespace of the engine pods
364365
port: the port of the engines
365366
label_selector: the label selector of the engines
367+
watcher_timeout_seconds: timeout in seconds for Kubernetes watcher streams (default: 30)
366368
"""
367369
self.app = app
368370
self.namespace = namespace
369371
self.port = port
370372
self.available_engines: Dict[str, EndpointInfo] = {}
371373
self.available_engines_lock = threading.Lock()
372374
self.label_selector = label_selector
375+
self.watcher_timeout_seconds = watcher_timeout_seconds
373376

374377
# Init kubernetes watcher
375378
try:
@@ -566,15 +569,13 @@ def _get_model_label(self, pod) -> Optional[str]:
566569
return pod.metadata.labels.get("model")
567570

568571
def _watch_engines(self):
569-
# TODO (ApostaC): remove the hard-coded timeouts
570-
571572
while self.running:
572573
try:
573574
for event in self.k8s_watcher.stream(
574575
self.k8s_api.list_namespaced_pod,
575576
namespace=self.namespace,
576577
label_selector=self.label_selector,
577-
timeout_seconds=30,
578+
timeout_seconds=self.watcher_timeout_seconds,
578579
):
579580
pod = event["object"]
580581
event_type = event["type"]
@@ -754,6 +755,7 @@ def __init__(
754755
label_selector=None,
755756
prefill_model_labels: List[str] | None = None,
756757
decode_model_labels: List[str] | None = None,
758+
watcher_timeout_seconds: int = 30,
757759
):
758760
"""
759761
Initialize the Kubernetes service discovery module. This module
@@ -782,13 +784,15 @@ def __init__(
782784
namespace: the namespace of the engine services
783785
port: the port of the engines
784786
label_selector: the label selector of the engines
787+
watcher_timeout_seconds: timeout in seconds for Kubernetes watcher streams (default: 30)
785788
"""
786789
self.app = app
787790
self.namespace = namespace
788791
self.port = port
789792
self.available_engines: Dict[str, EndpointInfo] = {}
790793
self.available_engines_lock = threading.Lock()
791794
self.label_selector = label_selector
795+
self.watcher_timeout_seconds = watcher_timeout_seconds
792796

793797
# Init kubernetes watcher
794798
try:
@@ -988,15 +992,13 @@ def _get_model_label(self, service) -> Optional[str]:
988992
return service.spec.selector.get("model")
989993

990994
def _watch_engines(self):
991-
# TODO (ApostaC): remove the hard-coded timeouts
992-
993995
while self.running:
994996
try:
995997
for event in self.k8s_watcher.stream(
996998
self.k8s_api.list_namespaced_service,
997999
namespace=self.namespace,
9981000
label_selector=self.label_selector,
999-
timeout_seconds=30,
1001+
timeout_seconds=self.watcher_timeout_seconds,
10001002
):
10011003
service = event["object"]
10021004
event_type = event["type"]

0 commit comments

Comments
 (0)