Skip to content

Commit 719ff00

Browse files
committed
Add response simple rate
1 parent 43096a8 commit 719ff00

File tree

5 files changed

+115
-12
lines changed

5 files changed

+115
-12
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
[package]
22
name = "ws_stress"
3-
version = "0.1.0"
3+
version = "0.1.1"
44
edition = "2021"
55
authors = ["konstantin Solovev"]
66

77
[dependencies]
88
clap = { version = "4", features = ["derive"] }
99
tokio = { version = "1.17", features = ["full"] }
1010
tokio-tungstenite = { version = "0.19.0", features = ["rustls-tls-native-roots"]}
11-
#tokio-rustls = { version = "0.23.4", default-features = false, features = ["logging"] }
1211
futures = "0.3"
1312
human_bytes = { version = "0.4.2", default-features = false }
1413
hashbrown = "0.14"
14+
rand = "0.8"
1515

README.MD

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,64 @@
11

2+
# Simple stress test tool for websockets
23

3-
cargo run -- --addr wss://0.0.0.0:8080 -m 10000 -c 1 '{"foo":"bar"}'
4+
```
5+
Usage: ws_stress [OPTIONS] --addr <ADDR> [MESSAGE]
6+
7+
Arguments:
8+
[MESSAGE] Message to send [default: "hello world!"]
9+
10+
Options:
11+
-a, --addr <ADDR>
12+
The websocket address to connect to, e.g. ws://0.0.0.0:8080
13+
-c, --connections <CONNECTIONS>
14+
How many concurrent sockets to open [default: 1]
15+
-m, --messages <MESSAGES>
16+
How many messages send per 1 socket [default: 1]
17+
-e, --log-errors
18+
Collect and print errors in the end of the tests
19+
-r, --resp-sample-rate <RESP_SAMPLE_RATE>
20+
Show responses from the server with the specified sample rate [0-1] [default: 0.0]
21+
-h, --help
22+
Print help
23+
-V, --version
24+
Print version
25+
26+
```
27+
28+
## Examples
29+
30+
### Simple run: 1 connection send 1 default message
31+
32+
```bash
33+
./ws_stress --addr ws://0.0.0.0:8080
34+
```
35+
Benchmarking: 1 messages, 1 connections
36+
Errors: {}
37+
Benchmark results:
38+
Ok: 1
39+
Err: 0
40+
Total: 1
41+
Elapsed time: 0.00 s
42+
Connections: 1
43+
Msg len: 12 B
44+
Requests per second: 11810.98 rps
45+
Throughput: 1.13 Mbit/sec
46+
47+
### Stress test: 1 connection sends 100000 specifies json messages
48+
49+
```bash
50+
./ws_stress --addr ws://0.0.0.0:8080 '{"traceId":"2644795231141946144","correlationId":121212121212,"payload":{"type":"Foo","data":[{"time":"2023-01-01T10:28:31.236453182Z","amount":0.01},{"time":"2023-02-02T10:28:31.236453182Z","amount":0.0051}]}}' \
51+
-m 100000 -c 1 -e
52+
```
53+
Benchmarking: 100000 messages, 1 connections
54+
Errors: {}
55+
Benchmark results:
56+
Ok: 100000
57+
Err: 0
58+
Total: 100000
59+
Elapsed time: 0.55 s
60+
Connections: 1
61+
Msg len: 511 B
62+
Requests per second: 183013.38 rps
63+
Throughput: 748.16 Mbit/sec
64+

src/args.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,35 @@ use clap::Parser;
33
#[derive(Parser, Debug)]
44
#[command(author, version, about, long_about = None)]
55
pub struct Args {
6-
/// The websocket address to connect to
6+
/// The websocket address to connect to, e.g. ws://0.0.0.0:8080
77
#[arg(short, long)]
88
pub addr: String,
99

10-
/// How many concurrent sockets to use
10+
/// How many concurrent sockets to open
1111
#[arg(short, long, default_value = "1")]
1212
pub connections: usize,
1313

14-
/// How many messages send per socket
14+
/// How many messages send per 1 socket
1515
#[arg(short, long, default_value = "1")]
1616
pub messages: usize,
1717

1818
/// Message to send
1919
#[arg(default_value = "hello world!")]
2020
pub message: String,
2121

22-
#[arg(short, long, default_value = "false")]
22+
/// Collect and print errors in the end of the tests
23+
#[arg(short = 'e', long, default_value = "false")]
2324
pub log_errors: bool,
25+
26+
/// Show responses from the server with the specified sample rate [0-1]
27+
#[arg(short = 'r', long, default_value = "0.0", value_parser = parse_sample_rate)]
28+
pub resp_sample_rate: f64,
29+
}
30+
31+
fn parse_sample_rate(src: &str) -> Result<f64, String> {
32+
let rate = src.parse::<f64>().map_err(|err| err.to_string())?;
33+
if !(0.0..=1.0).contains(&rate) {
34+
return Err(format!("Sample rate must be in range [0-1], got {}", rate));
35+
}
36+
Ok(rate)
2437
}

src/main.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@ use std::{
99
};
1010

1111
use clap::Parser;
12-
use futures::SinkExt;
12+
use futures::stream::SplitStream;
13+
use futures::{SinkExt, StreamExt};
1314
use human_bytes::human_bytes;
14-
use tokio_tungstenite::connect_async;
15+
use rand::Rng;
16+
use tokio::net::TcpStream;
1517
use tokio_tungstenite::tungstenite::Message::Text;
18+
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
1619

1720
#[tokio::main]
1821
async fn main() {
@@ -28,14 +31,18 @@ async fn main() {
2831
let msg = args.message.clone();
2932
let barrier = barrier.clone();
3033
let task = tokio::spawn(async move {
31-
let mut client_tungstenite = connect_async(addr).await.unwrap().0;
34+
let client_tungstenite = connect_async(addr).await.expect("Failed to connect").0;
35+
let (mut tx, rx) = client_tungstenite.split();
36+
37+
read_responses(rx, args.resp_sample_rate);
38+
3239
barrier.wait().await;
3340

3441
// Async, quite fast, with feedback (send_all is faster but we can't count
3542
// messages)
3643
let mut map = hashbrown::HashMap::new();
3744
for _ in 0..args.messages {
38-
match client_tungstenite.send(Text(msg.clone())).await {
45+
match tx.feed(Text(msg.clone())).await {
3946
Ok(_) => {
4047
ok_counter.fetch_add(1, Ordering::Relaxed);
4148
}
@@ -52,7 +59,7 @@ async fn main() {
5259
}
5360
}
5461
}
55-
62+
tx.flush().await.expect("flush failed");
5663
println!("Errors: {:?}", map);
5764
});
5865
tasks.push(task);
@@ -92,3 +99,24 @@ async fn main() {
9299
fn string_to_static_str(s: &str) -> &'static str {
93100
Box::leak(s.to_string().into_boxed_str())
94101
}
102+
103+
fn read_responses(
104+
mut rx: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
105+
sample_rate: f64,
106+
) {
107+
if sample_rate == 0.0 {
108+
return;
109+
}
110+
111+
// read all messages from rx and write in into stdout
112+
tokio::spawn(async move {
113+
while let Some(msg) = rx.next().await {
114+
if sample_rate == 1.0 || rand::thread_rng().gen_bool(sample_rate) {
115+
let received = msg.expect("Failed to receive a message");
116+
if received.is_text() {
117+
println!("> {}", received.into_text().unwrap());
118+
}
119+
};
120+
}
121+
});
122+
}

0 commit comments

Comments
 (0)