Skip to content

Commit 21bea5b

Browse files
committed
refactor: [torrust#1456] increase ban counters asyncronously
1 parent ad1b19a commit 21bea5b

File tree

5 files changed

+65
-18
lines changed

5 files changed

+65
-18
lines changed

packages/udp-tracker-server/src/environment.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ impl Environment<Stopped> {
8282
let udp_server_event_listener_job = Some(crate::statistics::event::listener::run_event_listener(
8383
self.container.udp_tracker_server_container.event_bus.receiver(),
8484
&self.container.udp_tracker_server_container.stats_repository,
85+
&self.container.udp_tracker_core_container.ban_service,
8586
));
8687

8788
// Start the UDP tracker server

packages/udp-tracker-server/src/handlers/mod.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use announce::handle_announce;
1313
use aquatic_udp_protocol::{Request, Response, TransactionId};
1414
use bittorrent_tracker_core::MAX_SCRAPE_TORRENTS;
1515
use bittorrent_udp_tracker_core::container::UdpTrackerCoreContainer;
16-
use bittorrent_udp_tracker_core::services::announce::UdpAnnounceError;
1716
use connect::handle_connect;
1817
use error::handle_error;
1918
use scrape::handle_scrape;
@@ -84,15 +83,6 @@ pub(crate) async fn handle_packet(
8483
{
8584
Ok((response, req_kid)) => return (response, Some(req_kid)),
8685
Err((error, transaction_id, req_kind)) => {
87-
if let Error::UdpAnnounceError {
88-
source: UdpAnnounceError::ConnectionCookieError { .. },
89-
} = error
90-
{
91-
// code-review: should we include `RequestParseError` and `BadRequest`?
92-
let mut ban_service = udp_tracker_core_container.ban_service.write().await;
93-
ban_service.increase_counter(&udp_request.from.ip());
94-
}
95-
9686
let response = handle_error(
9787
Some(req_kind.clone()),
9888
udp_request.from,

packages/udp-tracker-server/src/statistics/event/handler.rs

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
use std::sync::Arc;
2+
3+
use bittorrent_udp_tracker_core::services::banning::BanService;
4+
use tokio::sync::RwLock;
15
use torrust_tracker_metrics::label::{LabelSet, LabelValue};
26
use torrust_tracker_metrics::{label_name, metric_name};
37
use torrust_tracker_primitives::DurationSinceUnixEpoch;
48

5-
use crate::event::{Event, UdpRequestKind, UdpResponseKind};
9+
use crate::event::{ErrorKind, Event, UdpRequestKind, UdpResponseKind};
610
use crate::statistics::repository::Repository;
711
use crate::statistics::{
812
UDP_TRACKER_SERVER_ERRORS_TOTAL, UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS,
@@ -16,7 +20,12 @@ use crate::statistics::{
1620
/// This function panics if the client IP version does not match the expected
1721
/// version.
1822
#[allow(clippy::too_many_lines)]
19-
pub async fn handle_event(event: Event, stats_repository: &Repository, now: DurationSinceUnixEpoch) {
23+
pub async fn handle_event(
24+
event: Event,
25+
stats_repository: &Repository,
26+
ban_service: &Arc<RwLock<BanService>>,
27+
now: DurationSinceUnixEpoch,
28+
) {
2029
match event {
2130
Event::UdpRequestAborted { context } => {
2231
// Global fixed metrics
@@ -232,7 +241,14 @@ pub async fn handle_event(event: Event, stats_repository: &Repository, now: Dura
232241
Err(err) => tracing::error!("Failed to increase the counter: {}", err),
233242
};
234243
}
235-
Event::UdpError { context, kind, error: _ } => {
244+
Event::UdpError { context, kind, error } => {
245+
// Increase the number of errors
246+
// code-review: should we ban IP due to other errors too?
247+
if let ErrorKind::ConnectionCookie(_msg) = error {
248+
let mut ban_service = ban_service.write().await;
249+
ban_service.increase_counter(&context.client_socket_addr().ip());
250+
}
251+
236252
// Global fixed metrics
237253
match context.client_socket_addr().ip() {
238254
std::net::IpAddr::V4(_) => {
@@ -267,7 +283,9 @@ pub async fn handle_event(event: Event, stats_repository: &Repository, now: Dura
267283
#[cfg(test)]
268284
mod tests {
269285
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
286+
use std::sync::Arc;
270287

288+
use bittorrent_udp_tracker_core::services::banning::BanService;
271289
use torrust_tracker_clock::clock::Time;
272290
use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding};
273291

@@ -279,6 +297,7 @@ mod tests {
279297
#[tokio::test]
280298
async fn should_increase_the_number_of_aborted_requests_when_it_receives_a_udp_request_aborted_event() {
281299
let stats_repository = Repository::new();
300+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
282301

283302
handle_event(
284303
Event::UdpRequestAborted {
@@ -292,6 +311,7 @@ mod tests {
292311
),
293312
},
294313
&stats_repository,
314+
&ban_service,
295315
CurrentClock::now(),
296316
)
297317
.await;
@@ -304,6 +324,7 @@ mod tests {
304324
#[tokio::test]
305325
async fn should_increase_the_number_of_banned_requests_when_it_receives_a_udp_request_banned_event() {
306326
let stats_repository = Repository::new();
327+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
307328

308329
handle_event(
309330
Event::UdpRequestBanned {
@@ -317,6 +338,7 @@ mod tests {
317338
),
318339
},
319340
&stats_repository,
341+
&ban_service,
320342
CurrentClock::now(),
321343
)
322344
.await;
@@ -329,6 +351,7 @@ mod tests {
329351
#[tokio::test]
330352
async fn should_increase_the_number_of_incoming_requests_when_it_receives_a_udp4_incoming_request_event() {
331353
let stats_repository = Repository::new();
354+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
332355

333356
handle_event(
334357
Event::UdpRequestReceived {
@@ -342,6 +365,7 @@ mod tests {
342365
),
343366
},
344367
&stats_repository,
368+
&ban_service,
345369
CurrentClock::now(),
346370
)
347371
.await;
@@ -354,6 +378,7 @@ mod tests {
354378
#[tokio::test]
355379
async fn should_increase_the_udp_abort_counter_when_it_receives_a_udp_abort_event() {
356380
let stats_repository = Repository::new();
381+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
357382

358383
handle_event(
359384
Event::UdpRequestAborted {
@@ -367,6 +392,7 @@ mod tests {
367392
),
368393
},
369394
&stats_repository,
395+
&ban_service,
370396
CurrentClock::now(),
371397
)
372398
.await;
@@ -376,6 +402,7 @@ mod tests {
376402
#[tokio::test]
377403
async fn should_increase_the_udp_ban_counter_when_it_receives_a_udp_banned_event() {
378404
let stats_repository = Repository::new();
405+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
379406

380407
handle_event(
381408
Event::UdpRequestBanned {
@@ -389,6 +416,7 @@ mod tests {
389416
),
390417
},
391418
&stats_repository,
419+
&ban_service,
392420
CurrentClock::now(),
393421
)
394422
.await;
@@ -399,6 +427,7 @@ mod tests {
399427
#[tokio::test]
400428
async fn should_increase_the_udp4_connect_requests_counter_when_it_receives_a_udp4_request_event_of_connect_kind() {
401429
let stats_repository = Repository::new();
430+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
402431

403432
handle_event(
404433
Event::UdpRequestAccepted {
@@ -413,6 +442,7 @@ mod tests {
413442
kind: crate::event::UdpRequestKind::Connect,
414443
},
415444
&stats_repository,
445+
&ban_service,
416446
CurrentClock::now(),
417447
)
418448
.await;
@@ -425,6 +455,7 @@ mod tests {
425455
#[tokio::test]
426456
async fn should_increase_the_udp4_announce_requests_counter_when_it_receives_a_udp4_request_event_of_announce_kind() {
427457
let stats_repository = Repository::new();
458+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
428459

429460
handle_event(
430461
Event::UdpRequestAccepted {
@@ -439,6 +470,7 @@ mod tests {
439470
kind: crate::event::UdpRequestKind::Announce,
440471
},
441472
&stats_repository,
473+
&ban_service,
442474
CurrentClock::now(),
443475
)
444476
.await;
@@ -451,6 +483,7 @@ mod tests {
451483
#[tokio::test]
452484
async fn should_increase_the_udp4_scrape_requests_counter_when_it_receives_a_udp4_request_event_of_scrape_kind() {
453485
let stats_repository = Repository::new();
486+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
454487

455488
handle_event(
456489
Event::UdpRequestAccepted {
@@ -465,6 +498,7 @@ mod tests {
465498
kind: crate::event::UdpRequestKind::Scrape,
466499
},
467500
&stats_repository,
501+
&ban_service,
468502
CurrentClock::now(),
469503
)
470504
.await;
@@ -477,6 +511,7 @@ mod tests {
477511
#[tokio::test]
478512
async fn should_increase_the_udp4_responses_counter_when_it_receives_a_udp4_response_event() {
479513
let stats_repository = Repository::new();
514+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
480515

481516
handle_event(
482517
Event::UdpResponseSent {
@@ -494,6 +529,7 @@ mod tests {
494529
req_processing_time: std::time::Duration::from_secs(1),
495530
},
496531
&stats_repository,
532+
&ban_service,
497533
CurrentClock::now(),
498534
)
499535
.await;
@@ -506,6 +542,7 @@ mod tests {
506542
#[tokio::test]
507543
async fn should_increase_the_udp4_errors_counter_when_it_receives_a_udp4_error_event() {
508544
let stats_repository = Repository::new();
545+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
509546

510547
handle_event(
511548
Event::UdpError {
@@ -521,6 +558,7 @@ mod tests {
521558
error: ErrorKind::RequestParse("Invalid request format".to_string()),
522559
},
523560
&stats_repository,
561+
&ban_service,
524562
CurrentClock::now(),
525563
)
526564
.await;
@@ -533,6 +571,7 @@ mod tests {
533571
#[tokio::test]
534572
async fn should_increase_the_udp6_connect_requests_counter_when_it_receives_a_udp6_request_event_of_connect_kind() {
535573
let stats_repository = Repository::new();
574+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
536575

537576
handle_event(
538577
Event::UdpRequestAccepted {
@@ -547,6 +586,7 @@ mod tests {
547586
kind: crate::event::UdpRequestKind::Connect,
548587
},
549588
&stats_repository,
589+
&ban_service,
550590
CurrentClock::now(),
551591
)
552592
.await;
@@ -559,6 +599,7 @@ mod tests {
559599
#[tokio::test]
560600
async fn should_increase_the_udp6_announce_requests_counter_when_it_receives_a_udp6_request_event_of_announce_kind() {
561601
let stats_repository = Repository::new();
602+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
562603

563604
handle_event(
564605
Event::UdpRequestAccepted {
@@ -573,6 +614,7 @@ mod tests {
573614
kind: crate::event::UdpRequestKind::Announce,
574615
},
575616
&stats_repository,
617+
&ban_service,
576618
CurrentClock::now(),
577619
)
578620
.await;
@@ -585,6 +627,7 @@ mod tests {
585627
#[tokio::test]
586628
async fn should_increase_the_udp6_scrape_requests_counter_when_it_receives_a_udp6_request_event_of_scrape_kind() {
587629
let stats_repository = Repository::new();
630+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
588631

589632
handle_event(
590633
Event::UdpRequestAccepted {
@@ -599,6 +642,7 @@ mod tests {
599642
kind: crate::event::UdpRequestKind::Scrape,
600643
},
601644
&stats_repository,
645+
&ban_service,
602646
CurrentClock::now(),
603647
)
604648
.await;
@@ -611,6 +655,7 @@ mod tests {
611655
#[tokio::test]
612656
async fn should_increase_the_udp6_response_counter_when_it_receives_a_udp6_response_event() {
613657
let stats_repository = Repository::new();
658+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
614659

615660
handle_event(
616661
Event::UdpResponseSent {
@@ -628,6 +673,7 @@ mod tests {
628673
req_processing_time: std::time::Duration::from_secs(1),
629674
},
630675
&stats_repository,
676+
&ban_service,
631677
CurrentClock::now(),
632678
)
633679
.await;
@@ -639,6 +685,7 @@ mod tests {
639685
#[tokio::test]
640686
async fn should_increase_the_udp6_errors_counter_when_it_receives_a_udp6_error_event() {
641687
let stats_repository = Repository::new();
688+
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
642689

643690
handle_event(
644691
Event::UdpError {
@@ -654,6 +701,7 @@ mod tests {
654701
error: ErrorKind::RequestParse("Invalid request format".to_string()),
655702
},
656703
&stats_repository,
704+
&ban_service,
657705
CurrentClock::now(),
658706
)
659707
.await;

packages/udp-tracker-server/src/statistics/event/listener.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::sync::Arc;
22

3+
use bittorrent_udp_tracker_core::services::banning::BanService;
34
use bittorrent_udp_tracker_core::UDP_TRACKER_LOG_TARGET;
5+
use tokio::sync::RwLock;
46
use tokio::task::JoinHandle;
57
use torrust_tracker_clock::clock::Time;
68
use torrust_tracker_events::receiver::RecvError;
@@ -11,19 +13,24 @@ use crate::statistics::repository::Repository;
1113
use crate::CurrentClock;
1214

1315
#[must_use]
14-
pub fn run_event_listener(receiver: Receiver, repository: &Arc<Repository>) -> JoinHandle<()> {
15-
let stats_repository = repository.clone();
16+
pub fn run_event_listener(
17+
receiver: Receiver,
18+
repository: &Arc<Repository>,
19+
ban_service: &Arc<RwLock<BanService>>,
20+
) -> JoinHandle<()> {
21+
let repository_clone = repository.clone();
22+
let ban_service_clone = ban_service.clone();
1623

1724
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting UDP tracker server event listener");
1825

1926
tokio::spawn(async move {
20-
dispatch_events(receiver, stats_repository).await;
27+
dispatch_events(receiver, repository_clone, ban_service_clone).await;
2128

2229
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "UDP tracker server event listener finished");
2330
})
2431
}
2532

26-
async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc<Repository>) {
33+
async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc<Repository>, ban_service: Arc<RwLock<BanService>>) {
2734
let shutdown_signal = tokio::signal::ctrl_c();
2835
tokio::pin!(shutdown_signal);
2936

@@ -38,7 +45,7 @@ async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc<Repositor
3845

3946
result = receiver.recv() => {
4047
match result {
41-
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,
48+
Ok(event) => handle_event(event, &stats_repository, &ban_service, CurrentClock::now()).await,
4249
Err(e) => {
4350
match e {
4451
RecvError::Closed => {

src/bootstrap/jobs/udp_tracker_server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub fn start_event_listener(config: &Configuration, app_container: &Arc<AppConta
1010
let job = torrust_udp_tracker_server::statistics::event::listener::run_event_listener(
1111
app_container.udp_tracker_server_container.event_bus.receiver(),
1212
&app_container.udp_tracker_server_container.stats_repository,
13+
&app_container.udp_tracker_core_services.ban_service,
1314
);
1415
Some(job)
1516
} else {

0 commit comments

Comments
 (0)