-
Notifications
You must be signed in to change notification settings - Fork 641
feat: dp rank routing #3597
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: dp rank routing #3597
Conversation
Signed-off-by: PeaBrane <[email protected]>
Signed-off-by: PeaBrane <[email protected]>
WalkthroughAdds optional dp_rank propagation end-to-end: request preprocessing, Python worker handlers, engine init, routing/scheduling/indexing, sequence coordination, and KV event publishing/recording. Introduces WorkerWithDpRank type, extends multiple public structs/APIs, and adds data_parallel_size to runtime config and its bindings. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant Router as StandaloneRouterHandler
participant Kv as KvPushRouter
participant Sched as KvScheduler
participant Seq as ActiveSequences
Note over Client,Router: Request includes optional dp_rank
Client->>Router: generate(request{..., dp_rank?})
Router->>Kv: add_request(preprocessed{..., dp_rank})
Kv->>Sched: schedule(..., worker: WorkerWithDpRank)
Sched-->>Kv: best_worker: WorkerWithDpRank
Kv->>Seq: AddRequest(worker: WorkerWithDpRank)
Seq-->>Kv: Ack
Kv-->>Router: RoutingDecision(worker_id, dp_rank?)
Router-->>Client: Response (routed)
sequenceDiagram
autonumber
participant Frontend as Decode/Prefill Handler
participant Engine as EngineClient
Note over Frontend,Engine: Pass data_parallel_rank to token generation
Frontend->>Frontend: dp_rank = request.get("dp_rank")
Frontend->>Engine: generate(..., data_parallel_rank=dp_rank)
Engine-->>Frontend: token stream
Frontend-->>Frontend: publish KV events (dp_rank)
sequenceDiagram
autonumber
participant Engine
participant Pub as KvEventPublisher
participant Router as Kv Router/Indexer
Note over Engine,Pub: KV events carry dp_rank
Engine->>Pub: BlockStored/Removed(..., dp_rank?)
Pub->>Router: KvCacheEvent{..., dp_rank}
Router->>Router: Update overlaps by WorkerWithDpRank
Router-->>Pub: Done
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related issues
Poem
Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (6)
lib/bindings/c/src/lib.rs (1)
204-212
: Plumb dp_rank in C bindingsC bindings always set KvCacheEvent.dp_rank = None (lib/bindings/c/src/lib.rs:207–212, 225–230); without dp_rank stamping in KvEventPublisher, DP rank is lost. Add a dp_rank parameter to the C API and pass Some(dp_rank) when constructing KvCacheEvent.
lib/bindings/python/rust/llm/kv.rs (1)
241-269
: Update Python stubs for dp_rankIn lib/bindings/python/src/dynamo/_core.pyi:
- Add
dp_rank: Optional[int] = None
toKvEventPublisher.__init__(self, component: Component, worker_id: int, kv_block_size: int, dp_rank: Optional[int] = None) -> None
- Change
ApproxKvIndexer.process_routing_decision_for_request(self, tokens: List[int], lora_id: int, worker_id: int) -> None
to
process_routing_decision_for_request(self, tokens: List[int], worker_id: int, dp_rank: Optional[int] = None) -> Any
lib/llm/src/kv_router/sequence.rs (4)
499-517
: Avoid inserting mapping when we cannot service the worker locallyCurrently inserts request_to_worker before verifying a local sender exists; leaves stale mappings. Reorder to insert only when sender is present and add clearer logging.
- request_to_worker.insert(event.request_id.clone(), event.worker); - - if let Some(sender) = senders.get(&event.worker.worker_id) { + if let Some(sender) = senders.get(&event.worker.worker_id) { + request_to_worker.insert(event.request_id.clone(), event.worker); // For replicated events, we create a dummy response channel since we don't need to handle expired requests let (resp_tx, _) = tokio::sync::oneshot::channel(); let _ = sender.send(UpdateSequences::AddRequest { request_id: event.request_id.clone(), token_sequence: token_sequence.clone(), isl: *isl, overlap: *overlap, resp_tx, }); } else { tracing::warn!( - "Worker {:?} not found, cannot process AddRequest", - event.worker + "Worker {:?} not found, skipping AddRequest mapping for {}", + event.worker, + event.request_id ); }
528-535
: Harden MarkPrefillCompleted path and add diagnosticsSimilar let-chain risk; add separate checks and warn if sender missing.
- if let Some(worker) = request_to_worker.get(&event.request_id) - && let Some(sender) = senders.get(&worker.worker_id) - { - let _ = sender.send(UpdateSequences::MarkPrefillCompleted { - request_id: event.request_id.clone(), - }); - } + if let Some(worker) = request_to_worker.get(&event.request_id) { + if let Some(sender) = senders.get(&worker.worker_id) { + let _ = sender.send(UpdateSequences::MarkPrefillCompleted { + request_id: event.request_id.clone(), + }); + } else { + tracing::warn!( + "Sender for worker {:?} not found for MarkPrefillCompleted on {}", + worker, + event.request_id + ); + } + }
668-676
: Avoid unwrap in free() to prevent panics under worker churnRace with update_workers can drop the sender; handle gracefully and surface an error.
- self.senders - .get(&worker.worker_id) - .unwrap() - .send(UpdateSequences::Free { - request_id: request_id.clone(), - }) - .map_err(|_| anyhow::anyhow!("Failed to send free command to worker"))?; + if let Some(sender) = self.senders.get(&worker.worker_id) { + sender + .send(UpdateSequences::Free { + request_id: request_id.clone(), + }) + .map_err(|_| anyhow::anyhow!("Failed to send free command to worker"))?; + } else { + return Err(anyhow::anyhow!( + "Worker {} (dp_rank {:?}) not found for free()", + worker.worker_id, + worker.dp_rank + )); + }
704-713
: Avoid unwrap in mark_prefill_completed()Same unwrap hazard; add safe handling and clearer error.
- self.senders - .get(&worker.worker_id) - .unwrap() - .send(UpdateSequences::MarkPrefillCompleted { - request_id: request_id.clone(), - }) - .map_err(|_| { - anyhow::anyhow!("Failed to send mark_prefill_completed command to worker") - })?; + if let Some(sender) = self.senders.get(&worker.worker_id) { + sender + .send(UpdateSequences::MarkPrefillCompleted { + request_id: request_id.clone(), + }) + .map_err(|_| { + anyhow::anyhow!("Failed to send mark_prefill_completed command to worker") + })?; + } else { + return Err(anyhow::anyhow!( + "Worker {} (dp_rank {:?}) not found for mark_prefill_completed()", + worker.worker_id, + worker.dp_rank + )); + }
🧹 Nitpick comments (12)
lib/llm/src/local_model/runtime_config.rs (1)
22-24
: Add data_parallel_size: looks good; consider basic validationOptional: add a validation step (constructor or loader) to ensure Some(v) implies v >= 1, and elsewhere assert dp_rank < data_parallel_size when both are set.
lib/bindings/python/rust/llm/local_model.rs (1)
47-50
: Setter addition is correct; consider adding a getter for parityExpose a #[getter] data_parallel_size() -> Option for symmetry with other fields and easier introspection from Python.
Example snippet to add elsewhere in this impl:
#[getter] fn data_parallel_size(&self) -> Option<u32> { self.inner.data_parallel_size }Based on learnings
components/src/dynamo/vllm/main.py (1)
302-308
: Good addition; minor behavior tweak optionalConsider setting when >= 1 (not just > 1) to make world size explicit even for single-rank setups, and log the chosen size for clarity.
components/src/dynamo/router/__main__.py (1)
102-104
: Propagate with basic validation of dp_rankConsider normalizing dp_rank to a non-negative int (or None) before passing along to avoid downstream type/range errors.
Example minimal change:
- "dp_rank": request.get("dp_rank"), + "dp_rank": (lambda v: (int(v) if v is not None and int(v) >= 0 else None))(request.get("dp_rank")),Optionally, clamp/validate against configured data_parallel_size if available.
components/src/dynamo/vllm/handlers.py (2)
215-220
: Avoid catching bare Exception during prefill fallbackCatching Exception hides actionable errors. Narrow scope (e.g., EngineDeadError, ValueError, KeyError, network errors) or log with exc_info for debugging.
Example:
- except (EngineDeadError, ValueError, KeyError) as e: ...
- Or: logger.warning("Prefill error", exc_info=True)
254-261
: Prefill path: also normalize dp_rank and pass to engine safelyMirror the decode path normalization; ensures consistent behavior.
Apply:
- # Extract dp_rank from request if available - dp_rank = request.get("dp_rank") + # Extract and normalize dp_rank from request if available + dp_rank_raw = request.get("dp_rank") + try: + dp_rank = int(dp_rank_raw) if dp_rank_raw is not None else None + if dp_rank is not None and dp_rank < 0: + dp_rank = None + except (TypeError, ValueError): + dp_rank = Nonelib/llm/src/kv_router/indexer.rs (1)
309-312
: Early-exit semantics now depend on dp_ranks; consider worker_id aggregation.With dp_rank keys, block.borrow().workers.len() counts ranks, not unique worker_ids. If you want early_exit when exactly one worker_id owns the block (regardless of dp_ranks), aggregate by worker_id.
Apply this diff:
- if early_exit && block.borrow().workers.len() == 1 { - break; - } + if early_exit { + use std::collections::HashSet; + let unique = { + let b = block.borrow(); + b.workers + .keys() + .map(|w| w.worker_id) + .collect::<HashSet<_>>() + .len() + }; + if unique == 1 { + break; + } + }lib/llm/src/kv_router/scheduler.rs (1)
496-548
: Reduce log verbosity in hot selection path.tracing::info! inside the per-worker-per-rank loop will spam logs under load. Use debug/trace or guard behind a feature flag.
Example:
- tracing::info!( + tracing::trace!( "Formula for worker_id={} dp_rank={:?} with {overlap} cached blocks: {logit:.3} \ = {overlap_weight:.1} * prefill_blocks + decode_blocks \ = {overlap_weight:.1} * {potential_prefill_block:.3} + {decode_block:.3}", worker.worker_id, worker.dp_rank );lib/llm/src/kv_router/approx.rs (3)
108-115
: Fix docstring drift for TimerManager::insertSignature changed to batch insert with TTL; docs still mention a single key and a duration param.
- /// Inserts a new timer or updates an existing one for the given key. - /// - /// # Arguments - /// * `key` - The unique key for the timer. - /// * `duration` - The duration from now when the timer should expire. + /// Inserts or refreshes timers for the provided keys. + /// + /// Batches multiple keys and sets each to now() + self.ttl. If a key already exists, + /// its expiry is updated. pub fn insert(&mut self, keys: Vec<K>) {
218-220
: Confirm remove_worker semantics across dp_ranksremove_worker takes WorkerId only. With dp-aware keys in the trie, ensure this removes all WorkerWithDpRank variants for that worker_id. If not, expired/stale entries may linger.
I can help add a unit test that registers two dp_ranks for the same worker_id and asserts both are cleared by remove_worker.
53-55
: Prefer SequenceHash alias for claritysequence_hashes is Vec. Use SequenceHash to keep types consistent with tokens module.
- /// The sequence hashes of the tokens sent to the worker. - sequence_hashes: Vec<u64>, + /// The sequence hashes of the tokens sent to the worker. + sequence_hashes: Vec<SequenceHash>,lib/llm/src/kv_router/sequence.rs (1)
462-547
: General: avoid let-chains for side-effect operations in subscriptionslet-chains can inadvertently short-circuit side effects (e.g., removals). The proposed changes split conditions and add warnings, improving robustness and debuggability.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
components/src/dynamo/router/__main__.py
(1 hunks)components/src/dynamo/vllm/handlers.py
(3 hunks)components/src/dynamo/vllm/main.py
(1 hunks)lib/bindings/c/src/lib.rs
(2 hunks)lib/bindings/python/rust/llm/kv.rs
(7 hunks)lib/bindings/python/rust/llm/local_model.rs
(1 hunks)lib/llm/src/kv_router.rs
(5 hunks)lib/llm/src/kv_router/approx.rs
(16 hunks)lib/llm/src/kv_router/indexer.rs
(31 hunks)lib/llm/src/kv_router/protocols.rs
(5 hunks)lib/llm/src/kv_router/publisher.rs
(8 hunks)lib/llm/src/kv_router/recorder.rs
(2 hunks)lib/llm/src/kv_router/scheduler.rs
(14 hunks)lib/llm/src/kv_router/sequence.rs
(22 hunks)lib/llm/src/local_model/runtime_config.rs
(1 hunks)lib/llm/src/mocker/engine.rs
(1 hunks)lib/llm/src/protocols/common/preprocessor.rs
(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1392
File: launch/dynamo-run/src/subprocess/vllm_v1_inc.py:71-71
Timestamp: 2025-06-05T01:04:24.775Z
Learning: The `create_endpoint` method in `WorkerMetricsPublisher` has backward compatibility maintained through pyo3 signature annotation `#[pyo3(signature = (component, dp_rank = None))]`, making the `dp_rank` parameter optional with a default value of `None`.
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#3184
File: docs/architecture/kv_cache_routing.md:70-73
Timestamp: 2025-09-23T20:08:37.105Z
Learning: PeaBrane prefers to keep documentation diagrams simplified to avoid visual overload, even when this means sacrificing some technical precision for the sake of clarity and comprehension. They prioritize pedagogical effectiveness over exhaustive technical detail in architectural diagrams.
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#2756
File: lib/llm/src/kv_router/subscriber.rs:36-44
Timestamp: 2025-08-29T10:03:48.330Z
Learning: PeaBrane prefers to keep PRs contained in scope and is willing to defer technical improvements to future PRs when the current implementation works for the immediate use case. They acknowledge technical debt but prioritize deliverability over completeness in individual PRs.
📚 Learning: 2025-06-05T01:04:24.775Z
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1392
File: launch/dynamo-run/src/subprocess/vllm_v1_inc.py:71-71
Timestamp: 2025-06-05T01:04:24.775Z
Learning: The `create_endpoint` method in `WorkerMetricsPublisher` has backward compatibility maintained through pyo3 signature annotation `#[pyo3(signature = (component, dp_rank = None))]`, making the `dp_rank` parameter optional with a default value of `None`.
Applied to files:
lib/bindings/python/rust/llm/kv.rs
🧬 Code graph analysis (9)
lib/llm/src/kv_router/publisher.rs (1)
lib/bindings/python/rust/llm/kv.rs (15)
new
(43-49)new
(122-134)new
(145-157)new
(175-198)new
(245-269)new
(352-356)new
(415-451)new
(504-512)new
(595-607)new
(654-714)new
(784-794)new
(801-813)new
(820-832)new
(839-853)new
(942-979)
lib/llm/src/kv_router/sequence.rs (1)
lib/llm/src/kv_router/protocols.rs (2)
new
(17-19)from_worker_id
(21-26)
lib/llm/src/kv_router/scheduler.rs (1)
lib/llm/src/kv_router/protocols.rs (1)
from_worker_id
(21-26)
lib/bindings/python/rust/llm/kv.rs (4)
lib/bindings/python/src/dynamo/_core.pyi (4)
KvEventPublisher
(773-806)component
(88-92)Component
(104-131)process_routing_decision_for_request
(669-673)lib/llm/src/kv_router/indexer.rs (9)
new
(193-195)new
(216-222)new
(263-265)new
(600-604)new
(693-698)new
(921-927)kv_block_size
(1926-1926)kv_block_size
(1931-1931)kv_block_size
(1936-1936)lib/llm/src/kv_router/publisher.rs (6)
new
(105-155)new
(709-742)new
(758-765)new
(915-917)new
(1088-1096)kv_block_size
(161-163)lib/llm/src/kv_router/approx.rs (1)
process_routing_decision_for_request
(329-345)
lib/llm/src/kv_router.rs (2)
lib/llm/src/kv_router/scheduler.rs (1)
add_request
(330-342)lib/llm/src/kv_router/sequence.rs (2)
add_request
(117-150)add_request
(588-646)
lib/llm/src/kv_router/indexer.rs (2)
lib/bindings/python/rust/llm/kv.rs (17)
new
(43-49)new
(122-134)new
(145-157)new
(175-198)new
(245-269)new
(352-356)new
(415-451)new
(504-512)new
(595-607)new
(654-714)new
(784-794)new
(801-813)new
(820-832)new
(839-853)remove_worker
(395-398)clear_all_blocks
(400-403)scores
(327-334)lib/llm/src/kv_router/protocols.rs (1)
from_worker_id
(21-26)
components/src/dynamo/vllm/handlers.py (5)
components/src/dynamo/router/__main__.py (1)
generate
(79-124)lib/llm/src/kv_router.rs (2)
generate
(446-475)generate
(515-595)lib/llm/src/mocker/engine.rs (2)
generate
(314-432)generate
(495-506)lib/bindings/python/src/dynamo/_core.pyi (2)
generate
(1186-1218)get
(1346-1347)components/src/dynamo/sglang/request_handlers/handler_base.py (1)
generate
(54-64)
lib/llm/src/protocols/common/preprocessor.rs (1)
lib/llm/src/preprocessor.rs (1)
builder
(178-222)
lib/llm/src/kv_router/approx.rs (3)
lib/bindings/python/rust/llm/kv.rs (2)
apply_event
(376-393)scores
(327-334)lib/llm/src/kv_router/indexer.rs (4)
apply_event
(334-442)apply_event
(783-783)apply_event
(1006-1008)apply_event
(1259-1280)lib/llm/src/kv_router/protocols.rs (1)
from_worker_id
(21-26)
🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/3597/merge) by PeaBrane.
components/src/dynamo/vllm/handlers.py
[error] 208-214: Black formatting check failed. Files were reformatted by this hook. All changes made by hooks. Process completed with exit code 1.
🪛 Ruff (0.14.0)
components/src/dynamo/vllm/handlers.py
215-215: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: vllm (amd64)
- GitHub Check: Build and Test - dynamo
- GitHub Check: clippy (launch/dynamo-run)
- GitHub Check: tests (launch/dynamo-run)
- GitHub Check: tests (lib/bindings/python)
- GitHub Check: clippy (.)
- GitHub Check: tests (lib/runtime/examples)
- GitHub Check: clippy (lib/bindings/python)
- GitHub Check: tests (.)
🔇 Additional comments (20)
lib/llm/src/kv_router/protocols.rs (5)
8-27
: WorkerWithDpRank introduction looks solidProper derives (Eq, Hash, Serialize/Deserialize) and constructors enable HashMap keys and stable serde. Backward compat maintained via Option.
56-66
: WorkerSelectionResult now returns WorkerWithDpRankGood refactor; verify all callers updated to use .worker.worker_id where needed.
176-183
: ActiveSequenceEvent carries WorkerWithDpRankConsistent with scheduler/sequence changes; improves DP-aware routing context.
223-226
: KvCacheEvent.dp_rank addition is backward compatibleUse of serde default + Option ensures missing field deserializes; skip when None keeps payloads lean.
338-341
: Tests updated for dp_rank NoneGood coverage for serialization without dp_rank.
lib/llm/src/mocker/engine.rs (1)
283-287
: Publish KV events with dp_rankSetting dp_rank: Some(dp_rank) aligns events with new protocol; LGTM.
lib/llm/src/kv_router/recorder.rs (2)
53-54
: Recorder test: dp_rank explicit NoneKeeps tests aligned with extended KvCacheEvent. LGTM.
69-70
: Remove event: dp_rank explicit NoneConsistent with store event; LGTM.
components/src/dynamo/vllm/handlers.py (1)
84-93
: Verify vLLM engine_client.generate supports data_parallel_rank kwarg
Confirm the vLLM EngineClient.generate signature accepts thedata_parallel_rank
keyword to avoid a runtime TypeError.lib/llm/src/kv_router/indexer.rs (1)
672-699
: OverlapScores keyed by WorkerWithDpRank looks good.Map now tracks per-rank scores; frequencies preserved. This aligns with scheduler changes.
lib/llm/src/kv_router.rs (1)
355-380
: WorkerWithDpRank integration in scheduling path looks correct.Scheduling returns WorkerWithDpRank, ApproxKvIndexer receives the same, and overlap lookup uses the richer key before returning worker_id to callers.
lib/bindings/python/rust/llm/kv.rs (2)
327-334
: Aggregating OverlapScores over dp_ranks is appropriate for Python callers.Summing per-rank scores into a worker_id keyed map provides a stable public surface while preserving internal per-rank routing.
1151-1169
: Aggregating PotentialLoad across dp_ranks for the same worker_id looks good.Merges per-rank loads before pythonize; aligns with OverlapScores aggregation.
lib/llm/src/kv_router/scheduler.rs (1)
388-457
: softmax_sample over WorkerWithDpRank keys is consistent.Keying logits by WorkerWithDpRank and handling temperature=0 edge-case correctly preserves deterministic selection ties.
lib/llm/src/kv_router/approx.rs (2)
239-246
: dp_rank propagation into RouterEvent looks correctWorkerId and dp_rank are passed through explicitly on both store and remove paths. Matches indexer::apply_event expectations.
Based on relevant snippets
Also applies to: 272-281
310-345
: API shift to WorkerWithDpRank is consistentprocess_routing_decision and ..._for_request now take WorkerWithDpRank and forward it cleanly to the background task.
lib/llm/src/kv_router/sequence.rs (4)
797-845
: Per-dp_rank return shape for potential_blocks_and_tokensReturning HashMap<WorkerWithDpRank, usize> for both results aligns with dp-aware overlaps and keeps per-rank detail while dispatching by worker_id. Looks good.
Consider adding a test where two dp_ranks share a WorkerId to confirm both keys receive results.
571-573
: Cleanup mapping on worker removal is correctretain() predicate drops all entries whose mapped worker.worker_id matches the removed WorkerId.
284-286
: request_to_worker mapped to WorkerWithDpRankGood call; preserves dp_rank in subsequent frees/prefill updates and across replica sync.
519-526
: Bug: Free event may not remove mapping due to let-chainUsing let-chains ties mapping removal to sender presence. If sender is missing, the mapping is not removed, causing leaks and future panics.
- if let Some((_, worker)) = request_to_worker.remove(&event.request_id) - && let Some(sender) = senders.get(&worker.worker_id) - { - let _ = sender.send(UpdateSequences::Free { - request_id: event.request_id.clone(), - }); - } + if let Some((_, worker)) = request_to_worker.remove(&event.request_id) { + if let Some(sender) = senders.get(&worker.worker_id) { + let _ = sender.send(UpdateSequences::Free { + request_id: event.request_id.clone(), + }); + } else { + tracing::warn!( + "Sender for worker {:?} not found while freeing request {}", + worker, + event.request_id + ); + } + }Likely an incorrect or invalid review comment.
Signed-off-by: PeaBrane <[email protected]>
WalkthroughAdds optional data-parallel rank (dp_rank) support end-to-end: request preprocessing, Python handlers, runtime config, Rust protocols, KV router (indexer, scheduler, sequence), publishers/listeners, and bindings. Worker identity shifts from raw worker_id to WorkerWithDpRank. Loads and overlap scores become dp-rank-aware; potential loads aggregate by worker_id. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Client
participant Router as Router CLI
participant vH as vLLM Handlers
participant Engine as Engine Client
participant RouterCore as KV Router (Indexer/Scheduler)
Client->>Router: Request { ..., dp_rank? }
Router->>vH: PreprocessedRequest { ..., dp_rank? }
vH->>Engine: generate(..., data_parallel_rank=dp_rank?)
note right of Engine: Uses runtime data_parallel_size if set
Engine-->>vH: Tokens/Updates
vH-->>Client: Streaming/Final response
vH->>RouterCore: KV events with dp_rank
RouterCore->>RouterCore: Index, score overlaps per WorkerWithDpRank
RouterCore-->>vH: Best worker (WorkerWithDpRank)
sequenceDiagram
autonumber
participant Pub as KvEventPublisher (Py/Rust)
participant ZMQ as ZMQ Listener
participant Indexer as ApproxKvIndexer
participant Sched as Scheduler
participant Seq as ActiveSequences
Pub->>ZMQ: KvEventBatch { ..., dp_rank }
ZMQ->>Indexer: KvCacheEvent { ..., dp_rank }
Indexer->>Sched: Loads/overlaps keyed by WorkerWithDpRank
Sched->>Seq: best_worker: WorkerWithDpRank
Seq-->>Sched: Potential loads (by WorkerWithDpRank)
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Poem
Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
lib/llm/src/kv_router/scheduler.rs (1)
388-457
: Fix potential keys/values misalignment in softmax_sample
keys()
andvalues()
are iterated separately. Their orders are not guaranteed to align across independent iterators, risking returning a key with someone else’s probability. Build a single vector of (key, value) pairs instead.Apply:
-fn softmax_sample(logits: &HashMap<WorkerWithDpRank, f64>, temperature: f64) -> WorkerWithDpRank { +fn softmax_sample(logits: &HashMap<WorkerWithDpRank, f64>, temperature: f64) -> WorkerWithDpRank { if logits.is_empty() { panic!("Empty logits for softmax sampling"); } // Guard: if temperature is 0, return the key with the smallest logit value if temperature == 0.0 { - // Find the minimum logit value - let min_logit = logits.values().fold(f64::INFINITY, |a, &b| a.min(b)); - - // Collect all keys with the minimum logit value (to handle ties) - let min_keys: Vec<_> = logits - .iter() - .filter(|&(_, &v)| v == min_logit) - .map(|(k, _)| *k) - .collect(); + // Find min logit and collect all keys matching it (handle ties) + let mut min_logit = f64::INFINITY; + let mut min_keys: Vec<WorkerWithDpRank> = Vec::new(); + for (&k, &v) in logits.iter() { + if v < min_logit { + min_logit = v; + min_keys.clear(); + min_keys.push(k); + } else if v == min_logit { + min_keys.push(k); + } + } // Randomly select from the minimum keys (handles single key case naturally) let mut rng = rand::rng(); let index = rng.random_range(0..min_keys.len()); return min_keys[index]; } - let keys: Vec<_> = logits.keys().copied().collect(); - let values: Vec<_> = logits.values().copied().collect(); + // Keep keys and values aligned + let entries: Vec<(WorkerWithDpRank, f64)> = + logits.iter().map(|(&k, &v)| (k, v)).collect(); + let keys: Vec<WorkerWithDpRank> = entries.iter().map(|(k, _)| *k).collect(); + let values: Vec<f64> = entries.iter().map(|(_, v)| *v).collect(); // Find min and max for normalization let min_val = values.iter().fold(f64::INFINITY, |a, &b| a.min(b)); let max_val = values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b)); let probabilities = if min_val == max_val { // All values are the same, uniform probability vec![1.0 / keys.len() as f64; keys.len()] } else { // Normalize values - let normalized: Vec<_> = values + let normalized: Vec<f64> = values .iter() .map(|&v| { // Lower is better, so negate // Note we don't need to do actual min-max norm here, just off by an offset let norm = v / (max_val - min_val); -norm }) .collect(); // Apply temperature and softmax - let scaled: Vec<_> = normalized.iter().map(|&v| v / temperature).collect(); + let scaled: Vec<f64> = normalized.iter().map(|&v| v / temperature).collect(); let max_scaled = scaled.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b)); - let exp_values: Vec<_> = scaled.iter().map(|&v| (v - max_scaled).exp()).collect(); + let exp_values: Vec<f64> = scaled.iter().map(|&v| (v - max_scaled).exp()).collect(); let sum_exp: f64 = exp_values.iter().sum(); exp_values.iter().map(|&v| v / sum_exp).collect() }; // Sample from the probability distribution let mut rng = rand::rng(); let sample: f64 = rng.random(); let mut cumsum = 0.0; for (i, &prob) in probabilities.iter().enumerate() { cumsum += prob; if sample <= cumsum { return keys[i]; } } // Fallback to last key (shouldn't normally reach here) keys[keys.len() - 1] }
🧹 Nitpick comments (2)
lib/llm/src/kv_router/indexer.rs (1)
309-312
: Early-exit should consider unique worker_id, not dp_rankCurrent check breaks early only when exactly one WorkerWithDpRank is present. If a single worker exposes multiple dp_ranks, we miss the early exit. Consider exiting when there’s exactly one unique
worker_id
.Example tweak:
- Compute a bool like
let one_worker_id = block.borrow().workers.keys().map(|w| w.worker_id).collect::<std::collections::HashSet<_>>().len() == 1;
- Use
if early_exit && one_worker_id { break; }
lib/llm/src/kv_router/scheduler.rs (1)
541-547
: Reduce log level in hot scheduling pathPer-worker log at
info!
will be noisy under load. Considerdebug!
ortrace!
for the per-worker formula and the final selection log to avoid log volume spikes.Also applies to: 569-576
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
components/src/dynamo/router/__main__.py
(1 hunks)components/src/dynamo/vllm/handlers.py
(3 hunks)components/src/dynamo/vllm/main.py
(1 hunks)lib/bindings/c/src/lib.rs
(2 hunks)lib/bindings/python/rust/llm/kv.rs
(9 hunks)lib/bindings/python/rust/llm/local_model.rs
(1 hunks)lib/llm/src/kv_router.rs
(7 hunks)lib/llm/src/kv_router/approx.rs
(16 hunks)lib/llm/src/kv_router/indexer.rs
(31 hunks)lib/llm/src/kv_router/protocols.rs
(5 hunks)lib/llm/src/kv_router/publisher.rs
(8 hunks)lib/llm/src/kv_router/recorder.rs
(2 hunks)lib/llm/src/kv_router/scheduler.rs
(14 hunks)lib/llm/src/kv_router/sequence.rs
(22 hunks)lib/llm/src/local_model/runtime_config.rs
(1 hunks)lib/llm/src/mocker/engine.rs
(1 hunks)lib/llm/src/protocols/common/preprocessor.rs
(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1392
File: launch/dynamo-run/src/subprocess/vllm_v1_inc.py:71-71
Timestamp: 2025-06-05T01:04:24.775Z
Learning: The `create_endpoint` method in `WorkerMetricsPublisher` has backward compatibility maintained through pyo3 signature annotation `#[pyo3(signature = (component, dp_rank = None))]`, making the `dp_rank` parameter optional with a default value of `None`.
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#3184
File: docs/architecture/kv_cache_routing.md:70-73
Timestamp: 2025-09-23T20:08:37.105Z
Learning: PeaBrane prefers to keep documentation diagrams simplified to avoid visual overload, even when this means sacrificing some technical precision for the sake of clarity and comprehension. They prioritize pedagogical effectiveness over exhaustive technical detail in architectural diagrams.
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#2756
File: lib/llm/src/kv_router/subscriber.rs:36-44
Timestamp: 2025-08-29T10:03:48.330Z
Learning: PeaBrane prefers to keep PRs contained in scope and is willing to defer technical improvements to future PRs when the current implementation works for the immediate use case. They acknowledge technical debt but prioritize deliverability over completeness in individual PRs.
📚 Learning: 2025-06-05T01:04:24.775Z
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1392
File: launch/dynamo-run/src/subprocess/vllm_v1_inc.py:71-71
Timestamp: 2025-06-05T01:04:24.775Z
Learning: The `create_endpoint` method in `WorkerMetricsPublisher` has backward compatibility maintained through pyo3 signature annotation `#[pyo3(signature = (component, dp_rank = None))]`, making the `dp_rank` parameter optional with a default value of `None`.
Applied to files:
lib/bindings/python/rust/llm/kv.rs
🧬 Code graph analysis (9)
lib/llm/src/protocols/common/preprocessor.rs (1)
lib/llm/src/preprocessor.rs (1)
builder
(178-222)
components/src/dynamo/vllm/handlers.py (4)
lib/bindings/python/rust/llm/kv.rs (1)
generate
(983-1059)lib/llm/src/kv_router.rs (2)
generate
(446-475)generate
(515-597)components/src/dynamo/router/__main__.py (1)
generate
(79-124)lib/llm/src/mocker/engine.rs (2)
generate
(314-432)generate
(495-506)
lib/llm/src/kv_router/protocols.rs (2)
lib/bindings/python/rust/llm/kv.rs (7)
new
(43-49)new
(122-134)new
(145-157)new
(175-198)new
(245-269)new
(352-356)new
(415-451)lib/llm/src/kv_router/publisher.rs (5)
new
(105-155)new
(709-742)new
(758-765)new
(915-917)new
(1088-1096)
lib/llm/src/kv_router/sequence.rs (1)
lib/llm/src/kv_router/protocols.rs (2)
new
(17-19)from_worker_id
(21-26)
lib/llm/src/kv_router/scheduler.rs (2)
lib/llm/src/kv_router.rs (4)
new
(134-155)new
(218-318)new
(484-489)block_size
(415-417)lib/llm/src/kv_router/protocols.rs (1)
from_worker_id
(21-26)
lib/llm/src/kv_router/approx.rs (3)
lib/bindings/python/rust/llm/kv.rs (2)
apply_event
(376-393)scores
(327-334)lib/llm/src/kv_router/indexer.rs (4)
apply_event
(334-442)apply_event
(783-783)apply_event
(1006-1008)apply_event
(1259-1280)lib/llm/src/kv_router/protocols.rs (1)
from_worker_id
(21-26)
lib/llm/src/kv_router/indexer.rs (4)
lib/llm/src/kv_router/scoring.rs (2)
worker_id
(26-37)worker_ids
(71-73)lib/bindings/python/rust/llm/kv.rs (17)
new
(43-49)new
(122-134)new
(145-157)new
(175-198)new
(245-269)new
(352-356)new
(415-451)new
(504-512)new
(595-607)new
(654-714)new
(784-794)new
(801-813)new
(820-832)new
(839-853)remove_worker
(395-398)clear_all_blocks
(400-403)scores
(327-334)lib/llm/src/kv_router/approx.rs (3)
new
(90-97)new
(183-303)remove_worker
(385-387)lib/llm/src/kv_router/protocols.rs (1)
from_worker_id
(21-26)
lib/bindings/python/rust/llm/kv.rs (3)
lib/bindings/python/src/dynamo/_core.pyi (4)
KvEventPublisher
(773-806)component
(88-92)Component
(104-131)process_routing_decision_for_request
(669-673)lib/llm/src/kv_router/publisher.rs (6)
new
(105-155)new
(709-742)new
(758-765)new
(915-917)new
(1088-1096)kv_block_size
(161-163)lib/llm/src/kv_router/approx.rs (1)
process_routing_decision_for_request
(329-345)
lib/llm/src/kv_router.rs (2)
lib/llm/src/kv_router/scheduler.rs (1)
add_request
(330-342)lib/llm/src/kv_router/sequence.rs (2)
add_request
(117-150)add_request
(588-646)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: changed-files
- GitHub Check: Build and Test - dynamo
- GitHub Check: tests (lib/bindings/python)
- GitHub Check: tests (launch/dynamo-run)
- GitHub Check: tests (.)
- GitHub Check: tests (lib/runtime/examples)
- GitHub Check: clippy (launch/dynamo-run)
- GitHub Check: clippy (lib/bindings/python)
- GitHub Check: clippy (.)
🔇 Additional comments (10)
components/src/dynamo/router/__main__.py (1)
102-102
: LGTM! Properly propagates dp_rank from incoming request.The change correctly extracts
dp_rank
from the request and includes it in the preprocessed request dictionary, following the same pattern as other optional fields.lib/bindings/python/rust/llm/local_model.rs (1)
47-50
: LGTM! Setter follows established pattern.The new
set_data_parallel_size
setter is consistent with other setters in the file and properly exposes thedata_parallel_size
field to Python.lib/llm/src/local_model/runtime_config.rs (1)
22-24
: LGTM! Field addition follows existing patterns.The new
data_parallel_size
field is properly defined with appropriate documentation and serde attributes, consistent with other optional fields in the struct.lib/llm/src/kv_router/recorder.rs (1)
53-53
: LGTM! Test helpers updated correctly.The test helper functions properly initialize the new
dp_rank
field withNone
, maintaining test functionality after the field addition.Also applies to: 69-69
lib/llm/src/mocker/engine.rs (1)
286-286
: LGTM! Properly propagates dp_rank to KV events.The change correctly assigns the
dp_rank
value to theKvCacheEvent
, enabling per-DP-rank event tracking from the mock engine.components/src/dynamo/vllm/main.py (1)
302-307
: LGTM! Correctly configures data parallel size from vLLM config.The change properly extracts
data_parallel_size
from the vLLM configuration and sets it in the runtime config when data parallelism is enabled (size > 1). The safe attribute access and conditional check ensure robustness.lib/bindings/c/src/lib.rs (1)
210-210
: LGTM! C bindings updated with new field.The KvCacheEvent constructors in the C bindings properly initialize the new
dp_rank
field withNone
, which is appropriate for the C FFI context.Also applies to: 228-228
lib/llm/src/protocols/common/preprocessor.rs (1)
68-71
: LGTM! Field addition is well-structured.The new
dp_rank
field is properly defined with appropriate documentation, builder, and serde attributes. It follows the established pattern for optional fields in this struct.lib/llm/src/kv_router/approx.rs (1)
239-246
: DP-rank correctly threaded through event construction and expiry
RouterEvent
now carriesdp_rank
end-to-end (store/remove). Timer entries retain the worker identity with dp_rank. This preserves per-rank index state accurately.Also applies to: 250-253, 272-281
lib/bindings/python/rust/llm/kv.rs (1)
244-250
: Python API: ensure stubs/docs reflect new dp_rank argumentBindings correctly add
dp_rank: Option<u32>
to:
- KvEventPublisher.init
- KvEventPublisher.publish_{stored,removed} (payload propagation)
- ApproxKvIndexer.process_routing_decision_for_request
Please update Python type stubs and docs to match the new optional parameter so downstream users see the dp-aware API. For example, update
_core.pyi
KvEventPublisher.init to includedp_rank: Optional[int] = None
. Based on learnings.Also applies to: 284-299, 303-314, 535-552
Signed-off-by: PeaBrane <[email protected]>
Signed-off-by: PeaBrane <[email protected]>
Signed-off-by: PeaBrane <[email protected]>
Signed-off-by: PeaBrane <[email protected]>
bro brought it back from the dead |
Signed-off-by: PeaBrane <[email protected]>
Signed-off-by: PeaBrane <[email protected]>
Signed-off-by: PeaBrane <[email protected]>
Signed-off-by: PeaBrane <[email protected]>
Signed-off-by: PeaBrane <[email protected]>
/ok to test 34d16b1 |
Signed-off-by: PeaBrane <[email protected]>
Signed-off-by: PeaBrane <[email protected]>
Signed-off-by: PeaBrane <[email protected]>
Overview:
WorkerWithDpRank
which would includeworker_id
anddp_rank
. All core components inkv_router
would work with this protocol.dp_size
as anotherRuntimeConfig
Minor Notes
find_best_match
at the python binding level still returnworker_id
(without thedp_rank
info), otherwise it would be too breaking for many downstream components (e.g. sglang). Will think about dealing with this in future PRs to keep the scope in checkdp_rank
as optional inPreprocessedRequest
. However, when this reaches the core components inkv_router
andmocker
, we would always unwrap it to 0, otherwise the indexer + slot manager logics become too messy. Naturally, this means the publisher would need to publish a non-optional dp_rank (which I believe is already the current behavior)e2e tests
Summary by CodeRabbit