Skip to content

Commit 842e36c

Browse files
committed
test: [torrust#1096] add E2E test for banning IPs sending bad connection IDs
1 parent fc57262 commit 842e36c

File tree

4 files changed

+83
-28
lines changed

4 files changed

+83
-28
lines changed

src/servers/udp/handlers.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,13 @@ impl CookieTimeValues {
5353
/// - Delegating the request to the correct handler depending on the request type.
5454
///
5555
/// It will return an `Error` response if the request is invalid.
56-
#[instrument(fields(request_id), skip(udp_request, tracker, cookie_time_values, cbf), ret(level = Level::TRACE))]
56+
#[instrument(fields(request_id), skip(udp_request, tracker, cookie_time_values, connection_id_errors_per_ip), ret(level = Level::TRACE))]
5757
pub(crate) async fn handle_packet(
5858
udp_request: RawRequest,
5959
tracker: &Tracker,
6060
local_addr: SocketAddr,
6161
cookie_time_values: CookieTimeValues,
62-
cbf: Arc<RwLock<CountingBloomFilter>>,
62+
connection_id_errors_per_ip: Arc<RwLock<CountingBloomFilter>>,
6363
) -> Response {
6464
tracing::Span::current().record("request_id", Uuid::new_v4().to_string());
6565
tracing::debug!("Handling Packets: {udp_request:?}");
@@ -76,7 +76,10 @@ pub(crate) async fn handle_packet(
7676
| Error::CookieValueExpired { .. }
7777
| Error::CookieValueFromFuture { .. } => {
7878
// code-review: should we include `RequestParseError` and `BadRequest`?
79-
cbf.write().await.insert(&udp_request.from.ip().to_string());
79+
connection_id_errors_per_ip
80+
.write()
81+
.await
82+
.insert(&udp_request.from.ip().to_string());
8083
}
8184
_ => {}
8285
}

src/servers/udp/server/launcher.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ impl Launcher {
121121

122122
// Create a counting bloom filter that uses 4 bits per element and has a
123123
// false positive rate of 0.01 when 100 items have been inserted
124-
let cbf = Arc::new(RwLock::new(CountingBloomFilter::with_rate(4, 0.01, 100)));
124+
let connection_id_errors_per_ip = Arc::new(RwLock::new(CountingBloomFilter::with_rate(4, 0.01, 100)));
125125

126126
let addr = receiver.bound_socket_address();
127127
let local_addr = format!("udp://{addr}");
@@ -158,7 +158,10 @@ impl Launcher {
158158
}
159159
}
160160

161-
let connection_id_errors_from_ip = cbf.read().await.estimate_count(&req.from.ip().to_string());
161+
let connection_id_errors_from_ip = connection_id_errors_per_ip
162+
.read()
163+
.await
164+
.estimate_count(&req.from.ip().to_string());
162165

163166
if connection_id_errors_from_ip > MAX_CONNECTION_ID_ERRORS_PER_IP {
164167
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop continue: (banned ip)");
@@ -177,7 +180,7 @@ impl Launcher {
177180
// chance to finish. However, the buffer is yielding before
178181
// aborting one tasks, giving it the chance to finish.
179182
let abort_handle: tokio::task::AbortHandle =
180-
tokio::task::spawn(processor.process_request(req, cbf.clone())).abort_handle();
183+
tokio::task::spawn(processor.process_request(req, connection_id_errors_per_ip.clone())).abort_handle();
181184

182185
if abort_handle.is_finished() {
183186
continue;

src/servers/udp/server/processor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@ impl Processor {
2727
}
2828
}
2929

30-
#[instrument(skip(self, request, cbf))]
31-
pub async fn process_request(self, request: RawRequest, cbf: Arc<RwLock<CountingBloomFilter>>) {
30+
#[instrument(skip(self, request, connection_id_errors_per_ip))]
31+
pub async fn process_request(self, request: RawRequest, connection_id_errors_per_ip: Arc<RwLock<CountingBloomFilter>>) {
3232
let from = request.from;
3333
let response = handlers::handle_packet(
3434
request,
3535
&self.tracker,
3636
self.socket.address(),
3737
CookieTimeValues::new(self.cookie_lifetime),
38-
cbf,
38+
connection_id_errors_per_ip,
3939
)
4040
.await;
4141

tests/servers/udp/contract.rs

Lines changed: 68 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,31 @@ mod receiving_an_announce_request {
130130
use crate::servers::udp::contract::send_connection_request;
131131
use crate::servers::udp::Started;
132132

133-
pub async fn send_and_get_announce(tx_id: TransactionId, c_id: ConnectionId, client: &UdpTrackerClient) {
134-
// Send announce request
133+
pub async fn assert_send_and_get_announce(tx_id: TransactionId, c_id: ConnectionId, client: &UdpTrackerClient) {
134+
let response = send_and_get_announce(tx_id, c_id, client).await;
135+
assert!(is_ipv4_announce_response(&response));
136+
}
137+
138+
pub async fn send_and_get_announce(
139+
tx_id: TransactionId,
140+
c_id: ConnectionId,
141+
client: &UdpTrackerClient,
142+
) -> aquatic_udp_protocol::Response {
143+
let announce_request = build_sample_announce_request(tx_id, c_id, client.client.socket.local_addr().unwrap().port());
144+
145+
match client.send(announce_request.into()).await {
146+
Ok(_) => (),
147+
Err(err) => panic!("{err}"),
148+
};
135149

136-
let announce_request = AnnounceRequest {
150+
match client.receive().await {
151+
Ok(response) => response,
152+
Err(err) => panic!("{err}"),
153+
}
154+
}
155+
156+
fn build_sample_announce_request(tx_id: TransactionId, c_id: ConnectionId, port: u16) -> AnnounceRequest {
157+
AnnounceRequest {
137158
connection_id: ConnectionId(c_id.0),
138159
action_placeholder: AnnounceActionPlaceholder::default(),
139160
transaction_id: tx_id,
@@ -146,26 +167,34 @@ mod receiving_an_announce_request {
146167
ip_address: Ipv4Addr::new(0, 0, 0, 0).into(),
147168
key: PeerKey::new(0i32),
148169
peers_wanted: NumberOfPeers(1i32.into()),
149-
port: Port(client.client.socket.local_addr().unwrap().port().into()),
150-
};
170+
port: Port(port.into()),
171+
}
172+
}
151173

152-
match client.send(announce_request.into()).await {
153-
Ok(_) => (),
154-
Err(err) => panic!("{err}"),
155-
};
174+
#[tokio::test]
175+
async fn should_return_an_announce_response() {
176+
INIT.call_once(|| {
177+
tracing_stderr_init(LevelFilter::ERROR);
178+
});
156179

157-
let response = match client.receive().await {
158-
Ok(response) => response,
180+
let env = Started::new(&configuration::ephemeral().into()).await;
181+
182+
let client = match UdpTrackerClient::new(env.bind_address(), DEFAULT_TIMEOUT).await {
183+
Ok(udp_tracker_client) => udp_tracker_client,
159184
Err(err) => panic!("{err}"),
160185
};
161186

162-
// println!("test response {response:?}");
187+
let tx_id = TransactionId::new(123);
163188

164-
assert!(is_ipv4_announce_response(&response));
189+
let c_id = send_connection_request(tx_id, &client).await;
190+
191+
assert_send_and_get_announce(tx_id, c_id, &client).await;
192+
193+
env.stop().await;
165194
}
166195

167196
#[tokio::test]
168-
async fn should_return_an_announce_response() {
197+
async fn should_return_many_announce_response() {
169198
INIT.call_once(|| {
170199
tracing_stderr_init(LevelFilter::ERROR);
171200
});
@@ -181,13 +210,16 @@ mod receiving_an_announce_request {
181210

182211
let c_id = send_connection_request(tx_id, &client).await;
183212

184-
send_and_get_announce(tx_id, c_id, &client).await;
213+
for x in 0..1000 {
214+
tracing::info!("req no: {x}");
215+
assert_send_and_get_announce(tx_id, c_id, &client).await;
216+
}
185217

186218
env.stop().await;
187219
}
188220

189221
#[tokio::test]
190-
async fn should_return_many_announce_response() {
222+
async fn should_ban_the_client_ip_if_it_sends_more_than_10_requests_with_a_cookie_value_not_normal() {
191223
INIT.call_once(|| {
192224
tracing_stderr_init(LevelFilter::ERROR);
193225
});
@@ -201,13 +233,30 @@ mod receiving_an_announce_request {
201233

202234
let tx_id = TransactionId::new(123);
203235

204-
let c_id = send_connection_request(tx_id, &client).await;
236+
// The eleven first requests should be fine
205237

206-
for x in 0..1000 {
238+
let invalid_connection_id = ConnectionId::new(0); // Zero is one of the not normal values.
239+
240+
for x in 0..=10 {
207241
tracing::info!("req no: {x}");
208-
send_and_get_announce(tx_id, c_id, &client).await;
242+
send_and_get_announce(tx_id, invalid_connection_id, &client).await;
209243
}
210244

245+
// The twelfth request should be banned (timeout error)
246+
247+
let announce_request = build_sample_announce_request(
248+
tx_id,
249+
invalid_connection_id,
250+
client.client.socket.local_addr().unwrap().port(),
251+
);
252+
253+
match client.send(announce_request.into()).await {
254+
Ok(_) => (),
255+
Err(err) => panic!("{err}"),
256+
};
257+
258+
assert!(client.receive().await.is_err());
259+
211260
env.stop().await;
212261
}
213262
}

0 commit comments

Comments
 (0)