Skip to content

Commit d546795

Browse files
authored
Performance improvements for prefix cache routing (#933)
* Performance improvements for prefix cache routing Signed-off-by: Varun Gupta <varungup90@gmail.com>
1 parent 4f25c42 commit d546795

37 files changed

+1027
-656
lines changed

benchmarks/client/client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ async def send_request_streaming(client: openai.AsyncOpenAI,
5353
start_time = asyncio.get_event_loop().time()
5454
first_response_time = None
5555
target_pod = ""
56+
target_request_id = ""
5657
try:
5758
cur_time = time.time()
5859
logging.warning(f"send_request_streaming: Prepare to launch task after {target_time - cur_time}")
@@ -66,6 +67,7 @@ async def send_request_streaming(client: openai.AsyncOpenAI,
6667
)
6768
if hasattr(response_stream, 'response') and hasattr(response_stream.response, 'headers'):
6869
target_pod = response_stream.response.headers.get('target-pod')
70+
target_request_id = response_stream.response.headers.get('request-id')
6971

7072
text_chunks = []
7173
prompt_tokens = 0
@@ -117,6 +119,7 @@ async def send_request_streaming(client: openai.AsyncOpenAI,
117119
"ttft": ttft,
118120
"tpot": tpot,
119121
"target_pod": target_pod,
122+
"target_request_id": target_request_id,
120123
"session_id": session_id,
121124
}
122125

@@ -141,6 +144,7 @@ async def send_request_streaming(client: openai.AsyncOpenAI,
141144
"start_time": start_time,
142145
"end_time": error_time,
143146
"target_pod": target_pod,
147+
"target_request_id": target_request_id,
144148
"session_id": session_id,
145149
}
146150
logging.error(f"Request {request_id}: Error ({error_type}): {str(e)}")
@@ -375,4 +379,3 @@ def main(args):
375379

376380
args = parser.parse_args()
377381
main(args)
378-

config/gateway/gateway-plugin/gateway-plugin.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ spec:
5858
value: "6379"
5959
- name: AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS
6060
value: "50"
61+
- name: AIBRIX_PREFIX_CACHE_TOKENIZER_TYPE
62+
value: "character"
63+
- name: AIBRIX_PREFIX_CACHE_BLOCK_SIZE
64+
value: "128"
65+
- name: AIBRIX_PREFIX_CACHE_POD_RUNNING_REQUEST_IMBALANCE_ABS_COUNT
66+
value: "16"
67+
- name: AIBRIX_PREFIX_CACHE_STANDARD_DEVIATION_FACTOR
68+
value: "2"
6169
# - name: AIBRIX_PREFIX_CACHE_EVICTION_DURATION_MINS
6270
# value: "1"
6371
- name: POD_NAME

config/overlays/release/default_patch.yaml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,14 @@ spec:
1717
memory: 8Gi
1818
env:
1919
- name: AIBRIX_GPU_OPTIMIZER_TRACING_FLAG
20-
value: "false"
20+
value: "false"
21+
- name: AIBRIX_PREFIX_CACHE_TOKENIZER_TYPE
22+
value: "character"
23+
- name: AIBRIX_PREFIX_CACHE_BLOCK_SIZE
24+
value: "128"
25+
- name: AIBRIX_PREFIX_CACHE_BLOCK_NUMBER
26+
value: "200000"
27+
- name: AIBRIX_PREFIX_CACHE_POD_RUNNING_REQUEST_IMBALANCE_ABS_COUNT
28+
value: "16"
29+
- name: AIBRIX_PREFIX_CACHE_STANDARD_DEVIATION_FACTOR
30+
value: "2"

pkg/cache/cache_init.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,22 +91,24 @@ func New(redisClient *redis.Client, prometheusApi prometheusv1.API) *Store {
9191
}
9292
}
9393

94-
func NewTestCacheWithPods(pods []*v1.Pod) *Store {
94+
func NewTestCacheWithPods(pods []*v1.Pod, model string) *Store {
9595
c := &Store{}
9696
for _, pod := range pods {
9797
pod.Labels = make(map[string]string)
98-
pod.Labels[modelIdentifier] = "modelName"
98+
pod.Labels[modelIdentifier] = model
9999
c.addPod(pod)
100100
}
101101
return c
102102
}
103103

104-
func NewTestCacheWithPodsMetrics(pods []*v1.Pod, podMetrics map[string]map[string]metrics.MetricValue) *Store {
105-
c := NewTestCacheWithPods(pods)
104+
func NewTestCacheWithPodsMetrics(pods []*v1.Pod, model string, podMetrics map[string]map[string]metrics.MetricValue) *Store {
105+
c := NewTestCacheWithPods(pods, model)
106106
c.metaPods.Range(func(podName string, metaPod *Pod) bool {
107-
if metrics, ok := podMetrics[podName]; ok {
108-
for metricName, metric := range metrics {
109-
metaPod.Metrics.Store(metricName, metric)
107+
if podmetrics, ok := podMetrics[podName]; ok {
108+
for metricName, metric := range podmetrics {
109+
if err := c.updatePodRecord(metaPod, model, metricName, metrics.PodMetricScope, metric); err != nil {
110+
return false
111+
}
110112
}
111113
}
112114
return true

pkg/cache/cache_metrics.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -329,9 +329,6 @@ func (c *Store) queryUpdatePromQLMetrics(metric metrics.Metric, queryLabels map[
329329
// TODO: replace in-place metric update podMetrics and podModelMetrics to fresh copy for preventing stale metric keys
330330
func (c *Store) updatePodRecord(pod *Pod, modelName string, metricName string, scope metrics.MetricScope, metricValue metrics.MetricValue) error {
331331
if scope == metrics.PodMetricScope {
332-
if modelName != "" {
333-
return fmt.Errorf("modelName should be empty for scope %v", scope)
334-
}
335332
pod.Metrics.Store(metricName, metricValue)
336333
} else if scope == metrics.PodModelMetricScope {
337334
if modelName == "" {
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Routing Algorithms
2+
3+
## Prefix Cache Aware
4+
5+
Below is the pseudo-code for prefix-cache aware routing.
6+
7+
8+
```shell
9+
func prefix_cache_routing(ready_pods []*v1.Pod) {
10+
if check_load_imbalance(ready_pods) {
11+
target_pod = select_pod_with_least_running_requests(ready_pods)
12+
} else {
13+
match_pods, prefix_hashes = match_prefix(ready_pods)
14+
if len(match_pod) > 0 {
15+
target_pod = select_least_loaded_match_pod(match_pods)
16+
}
17+
}
18+
19+
// if no target pod is selected, fallback to select pod with least request
20+
if target_pod == nil {
21+
target_pod = select_pod_with_least_running_requests(ready_pods)
22+
}
23+
}
24+
25+
func check_load_imbalance(ready_pods) {
26+
// filter pods with min and max number of running requests
27+
min_pod = select_pod_min_running_requests()
28+
max_pod = select_pod_max_running_requests()
29+
30+
// if difference between max & min running requests count
31+
// is more than configurable ABS_RUNNING_REQUEST_COUNT (default: 8)
32+
// then load is imbalanced
33+
if max_pod - min_pod > ABS_RUNNING_REQUEST_COUNT {
34+
return true
35+
}
36+
return false
37+
}
38+
39+
func match_prefix(input_tokens, ready_pods) {
40+
// input_tokens are split based off configurable block_sizes and
41+
// hash is calculated for each token_block
42+
hashes = calculate_hashes(input_tokens)
43+
44+
// checks if token_block exists on ready_pods [prefix_match],
45+
// if present calculate pod_name: prefix_match_percent
46+
match_pods_with_prefix_match_percent = check_hashes_on_ready_pods(hashes, ready_pods)
47+
}
48+
49+
func select_least_loaded_match_pod(match_pods_with_prefix_match_percent, ready_pods) {
50+
mean = calculate_mean_running_request(ready_pods)
51+
std_dev = calculate_std_dev_running_request(ready_pods)
52+
53+
// sort match_pods in decreasing perfix_match_percent and
54+
// for same prefix_match_percent, sort in increasing running_request count.
55+
sort(match_pods_with_prefix_match_percent)
56+
57+
// select match pod with highest prefix and running_request < (mean + std_dev)
58+
for pod := range match_pods_with_prefix_match_percent {
59+
if pod.running_request < mean + load_factor*std_dev {
60+
return pod
61+
}
62+
}
63+
}
64+
65+
// selects pod with minimum running requests, similar to least-request routing algorithm
66+
func select_pod_with_least_running_requests(ready_pods) {
67+
return select_pod_min_running_requests()
68+
}
69+
```
70+
71+
## Configurations
72+
73+
- **_AIBRIX_PREFIX_CACHE_TOKENIZER_TYPE_**
74+
75+
AIBrix gateway implements two tokenizers **_character_** and **_tiktoken_**. Default tokenizer is <ins>**_character_**</ins>.
76+
77+
| Tokenizer Type | Details |
78+
| ------------- | ------------- |
79+
| character | splits input text into characters |
80+
| tiktoken | open-source openai/tiktoken [tokenizer](https://github.com/openai/tiktoken) |
81+
82+
- **_AIBRIX_PREFIX_CACHE_BLOCK_SIZE_**
83+
84+
Tokenized input request is split into blocks and hash value of the blocks is cached for future match. Size of the block (i.e. number of tokens per block) defines how effective prefix match will be. Default is <ins>**_character tokenizer and 128 block size (tokens per block)_**</ins>.
85+
86+
| Tokenizer Type | Block Size Recommendation |
87+
| ------------- | ------------- |
88+
| character | 128 |
89+
| tiktoken | 16 |
90+
91+
- **AIBRIX_PREFIX_CACHE_BLOCK_NUMBER**
92+
93+
Maximum number of prefix cache blocks. Default is <ins>**_200000_**</ins>.
94+
95+
- **AIBRIX_PREFIX_CACHE_POD_RUNNING_REQUEST_IMBALANCE_ABS_COUNT**
96+
97+
Before evaluating prefix cache match, router checks if there is imbalance of running requests across pods. Imbalance is measured using absolute difference between max & min running requests across pods, for example if imbalance_abs_count = 16 and running requests for pods are [p1: 1, p2: 2, p3:20] then current scenario is flagged as imbalanced. If flagged as imbalanced then prefix match is ignored and request is routed to pod with least running requests which in above example will to route to pod p1. Default is <ins>**_16_**</ins> and should be adjusted based on GPU hardware & prompt length.
98+
99+
- **AIBRIX_PREFIX_CACHE_STANDARD_DEVIATION_FACTOR**
100+
101+
After evaluating prefix match, pods are selected with matching prefix cache. Selected pods are re-evaluated to prevent a hotspot scenario where bulk of prefix matching requests are routed to same pod. Imbalanced is checked as follows
102+
<pre>
103+
prefix_match_pod.running_requests <= mean + <b>load_factor</b> * standard_deviation
104+
</pre>
105+
106+
**load_factor** determines number of standard deviations. Default is <ins>**_2_**</ins>

pkg/plugins/gateway/algorithms/least_request.go

Lines changed: 45 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -52,56 +52,20 @@ func NewLeastRequestRouter() (types.Router, error) {
5252
}, nil
5353
}
5454

55+
// Routes request based of least active request among input ready pods
5556
func (r leastRequestRouter) Route(ctx *types.RoutingContext, pods types.PodList) (string, error) {
56-
var targetPod *v1.Pod
57-
minCount := math.MaxFloat64
58-
59-
if pods.Len() == 0 {
60-
return "", fmt.Errorf("no pods to forward request")
61-
}
62-
63-
readyPods := utils.FilterRoutablePods(pods.All())
64-
if len(readyPods) == 0 {
65-
return "", fmt.Errorf("no ready pods available for fallback")
66-
}
67-
68-
for _, pod := range readyPods {
69-
runningReq, err := r.cache.GetMetricValueByPodModel(pod.Name, ctx.Model, metrics.NumRequestsRunning)
70-
if err != nil {
71-
klog.Error(err)
72-
continue
73-
}
74-
waitingReq, err := r.cache.GetMetricValueByPodModel(pod.Name, ctx.Model, metrics.NumRequestsWaiting)
75-
if err != nil {
76-
klog.Error(err)
77-
continue
78-
}
79-
swappedReq, err := r.cache.GetMetricValueByPodModel(pod.Name, ctx.Model, metrics.NumRequestsSwapped)
80-
if err != nil {
81-
klog.Error(err)
82-
continue
83-
}
84-
85-
totalReq := runningReq.GetSimpleValue() + waitingReq.GetSimpleValue() + swappedReq.GetSimpleValue()
86-
klog.V(4).Infof("pod: %v, podIP: %v, runningReq: %v, waitingReq: %v, swappedReq: %v, totalReq: %v",
87-
pod.Name, pod.Status.PodIP, runningReq, waitingReq, swappedReq, totalReq)
88-
89-
if totalReq <= minCount {
90-
minCount = totalReq
91-
targetPod = pod
92-
}
93-
}
57+
targetPod := selectTargetPodWithLeastRequestCount(r.cache, pods.All())
9458

9559
// Use fallback if no valid metrics
9660
if targetPod == nil {
97-
klog.Warning("No pods with valid metrics found; selecting a pod randomly as fallback")
61+
klog.Warning("no pods with valid metrics found for least-request routing strategy; selecting a pod randomly as fallback",
62+
"requestID", ctx.RequestID)
9863
var err error
9964
targetPod, err = selectRandomPod(pods.All(), rand.Intn)
10065
if err != nil {
10166
return "", err
10267
}
10368
}
104-
10569
if targetPod == nil {
10670
return "", fmt.Errorf("no pods to forward request")
10771
}
@@ -112,8 +76,46 @@ func (r leastRequestRouter) Route(ctx *types.RoutingContext, pods types.PodList)
11276

11377
func (r *leastRequestRouter) SubscribedMetrics() []string {
11478
return []string{
115-
metrics.NumRequestsRunning,
116-
metrics.NumRequestsWaiting,
117-
metrics.NumRequestsSwapped,
79+
metrics.RealtimeNumRequestsRunning,
80+
}
81+
}
82+
83+
func selectTargetPodWithLeastRequestCount(cache cache.Cache, readyPods []*v1.Pod) *v1.Pod {
84+
var targetPod *v1.Pod
85+
targetPods := []string{}
86+
87+
minCount := math.MaxInt32
88+
podRequestCount := getRequestCounts(cache, readyPods)
89+
for _, totalReq := range podRequestCount {
90+
if totalReq <= minCount {
91+
minCount = totalReq
92+
}
11893
}
94+
for podname, totalReq := range podRequestCount {
95+
if totalReq == minCount {
96+
targetPods = append(targetPods, podname)
97+
}
98+
}
99+
if len(targetPods) > 0 {
100+
targetPod, _ = utils.FilterPodByName(targetPods[rand.Intn(len(targetPods))], readyPods)
101+
}
102+
return targetPod
103+
}
104+
105+
// getRequestCounts returns running request count for each pod tracked by gateway.
106+
// Note: Currently, gateway instance tracks active running request counts for each pod locally,
107+
// if multiple gateway instances are active then state is not shared across them.
108+
// It is advised to run on leader gateway instance.
109+
// TODO: Support stateful information sync across gateway instances: https://github.com/vllm-project/aibrix/issues/761
110+
func getRequestCounts(cache cache.Cache, readyPods []*v1.Pod) map[string]int {
111+
podRequestCount := map[string]int{}
112+
for _, pod := range readyPods {
113+
runningReq, err := cache.GetMetricValueByPod(pod.Name, metrics.RealtimeNumRequestsRunning)
114+
if err != nil {
115+
runningReq = &metrics.SimpleMetricValue{Value: 0}
116+
}
117+
podRequestCount[pod.Name] = int(runningReq.GetSimpleValue())
118+
}
119+
120+
return podRequestCount
119121
}

0 commit comments

Comments
 (0)