@@ -3,10 +3,11 @@ use std::sync::Arc;
3
3
use std:: time:: Duration ;
4
4
5
5
use bittorrent_tracker_client:: udp:: client:: check;
6
+ use bloom:: CountingBloomFilter ;
6
7
use derive_more:: Constructor ;
7
8
use futures_util:: StreamExt ;
8
9
use tokio:: select;
9
- use tokio:: sync:: oneshot;
10
+ use tokio:: sync:: { oneshot, RwLock } ;
10
11
use tracing:: instrument;
11
12
12
13
use super :: request_buffer:: ActiveRequests ;
@@ -20,6 +21,10 @@ use crate::servers::udp::server::processor::Processor;
20
21
use crate :: servers:: udp:: server:: receiver:: Receiver ;
21
22
use crate :: servers:: udp:: UDP_TRACKER_LOG_TARGET ;
22
23
24
+ /// The maximum number of connection id errors per ip. Clients will be banned if
25
+ /// they exceed this limit.
26
+ const MAX_CONNECTION_ID_ERRORS_PER_IP : u32 = 10 ;
27
+
23
28
/// A UDP server instance launcher.
24
29
#[ derive( Constructor ) ]
25
30
pub struct Launcher ;
@@ -114,6 +119,10 @@ impl Launcher {
114
119
async fn run_udp_server_main ( mut receiver : Receiver , tracker : Arc < Tracker > , cookie_lifetime : Duration ) {
115
120
let active_requests = & mut ActiveRequests :: default ( ) ;
116
121
122
+ // Create a counting bloom filter that uses 4 bits per element and has a
123
+ // false positive rate of 0.01 when 100 items have been inserted
124
+ let cbf = Arc :: new ( RwLock :: new ( CountingBloomFilter :: with_rate ( 4 , 0.01 , 100 ) ) ) ;
125
+
117
126
let addr = receiver. bound_socket_address ( ) ;
118
127
let local_addr = format ! ( "udp://{addr}" ) ;
119
128
@@ -140,6 +149,13 @@ impl Launcher {
140
149
}
141
150
} ;
142
151
152
+ let connection_id_errors_from_ip = cbf. read ( ) . await . estimate_count ( & req. from . ip ( ) . to_string ( ) ) ;
153
+
154
+ if connection_id_errors_from_ip > MAX_CONNECTION_ID_ERRORS_PER_IP {
155
+ tracing:: debug!( target: UDP_TRACKER_LOG_TARGET , local_addr, "Udp::run_udp_server::loop continue: (banned ip)" ) ;
156
+ continue ;
157
+ }
158
+
143
159
// We spawn the new task even if there active requests buffer is
144
160
// full. This could seem counterintuitive because we are accepting
145
161
// more request and consuming more memory even if the server is
@@ -151,7 +167,8 @@ impl Launcher {
151
167
// are only adding and removing tasks without given them the
152
168
// chance to finish. However, the buffer is yielding before
153
169
// aborting one tasks, giving it the chance to finish.
154
- let abort_handle: tokio:: task:: AbortHandle = tokio:: task:: spawn ( processor. process_request ( req) ) . abort_handle ( ) ;
170
+ let abort_handle: tokio:: task:: AbortHandle =
171
+ tokio:: task:: spawn ( processor. process_request ( req, cbf. clone ( ) ) ) . abort_handle ( ) ;
155
172
156
173
if abort_handle. is_finished ( ) {
157
174
continue ;
0 commit comments