Skip to content

Commit 78273a6

Browse files
perf: try to send as many data as possible using write_chunks() (WIP)
1 parent b5218dd commit 78273a6

File tree

1 file changed

+93
-11
lines changed

1 file changed

+93
-11
lines changed

perf/src/bin/perf_client.rs

Lines changed: 93 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -417,22 +417,104 @@ async fn request(
417417

418418
let send_stream_stats = stream_stats.new_sender(&send, upload);
419419

420-
static DATA: [u8; 1024 * 1024] = [42; 1024 * 1024];
420+
static DATA: [u8; 1024 * 1024] = [42; 1024 * 1024]; // 1MB of data
421+
let unrolled = true;
422+
421423
let mut remaining = upload;
422424
let upload_start = Instant::now();
423425
while remaining > 0 {
424-
let chunk_len = remaining.min(DATA.len() as u64);
426+
if unrolled {
427+
#[rustfmt::skip]
428+
let mut data_chunks = [ // 32 MB of data
429+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
430+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
431+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
432+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
433+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
434+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
435+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
436+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
437+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
438+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
439+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
440+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
441+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
442+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
443+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
444+
Bytes::from_static(&DATA), Bytes::from_static(&DATA),
445+
];
446+
let one_data_chunks_len = data_chunks[0].len() as u64;
447+
let all_data_chunks_len = data_chunks[..data_chunks.len()]
448+
.iter()
449+
.map(|b| b.len() as u64)
450+
.sum::<u64>();
451+
452+
if remaining > all_data_chunks_len {
453+
// send all chunks at the same time
454+
tokio::select! {
455+
biased;
456+
_ = shutdown.cancelled() => {
457+
break;
458+
},
459+
res = send.write_chunks(&mut data_chunks) => {
460+
let res = res.context("sending all chunks")?;
461+
462+
info!("sent {} chunks for {} bytes remaining {remaining}", res.chunks, res.bytes);
463+
464+
send_stream_stats.on_bytes(res.bytes);
465+
remaining -= res.bytes as u64;
466+
}
467+
}
468+
} else if remaining <= one_data_chunks_len {
469+
// manually send remaining data
470+
let chunk_len = remaining.min(DATA.len() as u64);
471+
472+
tokio::select! {
473+
biased;
474+
_ = shutdown.cancelled() => {
475+
break;
476+
},
477+
res = send.write_chunk(Bytes::from_static(&DATA[..chunk_len as usize])) => {
478+
res.context("sending response")?;
479+
480+
info!("sent {chunk_len} bytes remaining {remaining}");
481+
482+
send_stream_stats.on_bytes(chunk_len as usize);
483+
remaining -= chunk_len;
484+
}
485+
}
486+
} else {
487+
// send a bunch of chunks but not all
488+
let chunk_count = remaining / one_data_chunks_len;
489+
tokio::select! {
490+
biased;
491+
_ = shutdown.cancelled() => {
492+
break;
493+
},
494+
res = send.write_chunks(&mut data_chunks[..chunk_count as usize]) => {
495+
let res = res.context("sending some chunks")?;
496+
497+
info!("sent {} chunks for {} bytes remaining {remaining}", res.chunks, res.bytes);
498+
499+
send_stream_stats.on_bytes(res.bytes);
500+
remaining -= res.bytes as u64;
501+
}
502+
}
503+
}
504+
} else {
505+
let chunk_len = remaining.min(DATA.len() as u64);
425506

426-
tokio::select! {
427-
biased;
428-
_ = shutdown.cancelled() => {
429-
break;
430-
},
431-
res = send.write_chunk(Bytes::from_static(&DATA[..chunk_len as usize])) => {
432-
res.context("sending response")?;
507+
tokio::select! {
508+
biased;
509+
_ = shutdown.cancelled() => {
510+
break;
511+
},
512+
res = send.write_chunk(Bytes::from_static(&DATA[..chunk_len as usize])) => {
513+
res.context("sending response")?;
433514

434-
send_stream_stats.on_bytes(chunk_len as usize);
435-
remaining -= chunk_len;
515+
send_stream_stats.on_bytes(chunk_len as usize);
516+
remaining -= chunk_len;
517+
}
436518
}
437519
}
438520
}

0 commit comments

Comments
 (0)