Skip to content

Commit 1079277

Browse files
committed
fix tests
1 parent 524ced4 commit 1079277

File tree

15 files changed

+135
-118
lines changed

15 files changed

+135
-118
lines changed

crates/test/src/lib.rs

Lines changed: 31 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use quilkin::{
88
test::TestConfig,
99
};
1010
pub use serde_json::json;
11-
use std::{net::SocketAddr, num::NonZeroUsize, path::PathBuf, sync::Arc};
11+
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
1212
use tokio::sync::mpsc;
1313

1414
pub mod xdp_util;
@@ -266,13 +266,10 @@ impl Pail {
266266
let pail = match spc.config {
267267
PailConfig::Server(sspc) => {
268268
let (packet_tx, packet_rx) = mpsc::channel::<String>(10);
269-
let socket = quilkin::net::DualStackEpollSocket::new(0)
270-
.expect("failed to create server socket");
269+
let socket =
270+
quilkin::net::Socket::polling_listen().expect("failed to create server socket");
271271

272-
let port = socket
273-
.local_addr()
274-
.expect("failed to bind server socket")
275-
.port();
272+
let port = socket.local_addr().port();
276273

277274
tracing::debug!(port, spc.name, "bound server socket");
278275

@@ -285,7 +282,7 @@ impl Pail {
285282

286283
while num_packets > 0 {
287284
let (size, _) = socket
288-
.recv_from(&mut buf)
285+
.recv_from(&mut *buf)
289286
.await
290287
.expect("failed to receive packet");
291288
received += size;
@@ -393,10 +390,7 @@ impl Pail {
393390
let (shutdown, shutdown_rx) =
394391
quilkin::signal::channel(quilkin::signal::ShutdownKind::Testing);
395392

396-
let port = quilkin::net::socket_port(
397-
&quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"),
398-
);
399-
393+
let port = quilkin::test::available_port();
400394
let config_path = path.clone();
401395
let config = Arc::new(Config::default_agent());
402396
config.dyn_cfg.id.store(Arc::new(spc.name.into()));
@@ -431,15 +425,6 @@ impl Pail {
431425
})
432426
}
433427
PailConfig::Proxy(ppc) => {
434-
let socket = quilkin::net::raw_socket_with_reuse(0).expect("failed to bind socket");
435-
let qcmp =
436-
quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket");
437-
let qcmp_port = quilkin::net::socket_port(&qcmp);
438-
let phoenix = TcpListener::bind(None).expect("failed to bind phoenix socket");
439-
let phoenix_port = phoenix.port();
440-
441-
let port = quilkin::net::socket_port(&socket);
442-
443428
let management_servers = spc
444429
.dependencies
445430
.iter()
@@ -499,31 +484,25 @@ impl Pail {
499484
}
500485

501486
config.dyn_cfg.id.store(Arc::new(spc.name.into()));
502-
let pconfig = config.clone();
503-
504-
let (rttx, rtrx) = tokio::sync::mpsc::unbounded_channel();
505-
506-
let task = tokio::spawn(async move {
507-
components::proxy::Proxy {
508-
num_workers: NonZeroUsize::new(1).unwrap(),
509-
management_servers,
510-
socket: Some(socket),
511-
qcmp,
512-
phoenix,
513-
notifier: Some(rttx),
514-
..Default::default()
515-
}
516-
.run(
517-
RunArgs {
518-
config: pconfig,
519-
ready: Default::default(),
520-
shutdown_rx,
521-
},
522-
Some(tx),
523-
)
524-
.await
525-
});
526-
487+
let qcmp_port = quilkin::test::available_port();
488+
let phoenix_port = quilkin::test::available_port();
489+
let port = quilkin::test::available_port();
490+
491+
let _provider_task = quilkin::config::providersv2::Providers::default()
492+
.xds_endpoints(management_servers)
493+
.spawn_providers(&config, <_>::default(), None);
494+
495+
let task = quilkin::cli::Service::default()
496+
.udp()
497+
.udp_port(port)
498+
.qcmp()
499+
.qcmp_port(qcmp_port)
500+
.phoenix()
501+
.phoenix_port(phoenix_port)
502+
.spawn_services(&config, &shutdown_rx)
503+
.unwrap();
504+
505+
let _ = tx.send(());
527506
rx = Some(orx);
528507

529508
Self::Proxy(ProxyPail {
@@ -533,7 +512,7 @@ impl Pail {
533512
shutdown,
534513
task,
535514
config,
536-
delta_applies: Some(rtrx),
515+
delta_applies: None,
537516
})
538517
}
539518
};
@@ -680,20 +659,20 @@ impl Sandbox {
680659

681660
#[inline]
682661
pub fn socket(&self) -> (socket2::Socket, SocketAddr) {
683-
let socket = quilkin::net::raw_socket_with_reuse(0).unwrap();
684-
let port = quilkin::net::socket_port(&socket);
662+
let socket = quilkin::net::io::SystemSocket::listen().unwrap();
663+
let port = socket.port();
685664

686665
(
687-
socket,
666+
socket.into_inner(),
688667
SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, port)),
689668
)
690669
}
691670

692671
/// Creates an ephemeral socket that can be used to send messages to sandbox
693672
/// pails
694673
#[inline]
695-
pub fn client(&self) -> quilkin::net::DualStackEpollSocket {
696-
quilkin::net::DualStackEpollSocket::new(0).unwrap()
674+
pub fn client(&self) -> quilkin::net::Socket {
675+
quilkin::net::Socket::polling_listen().unwrap()
697676
}
698677

699678
/// Sleeps for the specified number of milliseconds

crates/test/src/xdp_util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use quilkin::net::xdp::process;
1+
use quilkin::net::io::nic::xdp::process;
22
use xdp::{Packet, packet::net_types::UdpHeaders};
33

44
#[inline]

crates/test/tests/mesh.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ trace_test!(relay_routing, {
112112
msg.extend_from_slice(&token.inner);
113113

114114
tracing::info!(%token, "sending packet");
115-
client.send_to(&msg, &proxy_address).await.unwrap();
115+
client.send_to(&*msg, proxy_address).await.unwrap();
116116

117117
assert_eq!(
118118
"hello",
@@ -124,7 +124,7 @@ trace_test!(relay_routing, {
124124
tracing::info!(%token, "sending bad packet");
125125
// send an invalid packet
126126
client
127-
.send_to(b"hello\xFF\xFF\xFF", &proxy_address)
127+
.send_to(&b"hello\xFF\xFF\xFF"[..], proxy_address)
128128
.await
129129
.unwrap();
130130

@@ -308,7 +308,7 @@ trace_test!(filter_update, {
308308
msg.extend_from_slice(&token);
309309

310310
tracing::info!(len = token.len(), "sending packet");
311-
client.send_to(&msg, &proxy_address).await.unwrap();
311+
client.send_to(&*msg, proxy_address).await.unwrap();
312312

313313
tracing::info!(len = token.len(), "received packet");
314314
assert_eq!(
@@ -320,7 +320,7 @@ trace_test!(filter_update, {
320320
// send an invalid packet
321321
msg.truncate(5);
322322
msg.extend((0..token.len()).map(|_| b'b'));
323-
client.send_to(&msg, &proxy_address).await.unwrap();
323+
client.send_to(msg, proxy_address).await.unwrap();
324324

325325
sandbox.expect_timeout(50, server_rx.recv()).await;
326326
tracing::info!(len = token.len(), "didn't receive bad packet");

crates/test/tests/proxy.rs

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use qt::*;
2-
use quilkin::{components::proxy, net, test::TestConfig};
2+
use quilkin::test::TestConfig;
33

44
trace_test!(server, {
55
let mut sc = qt::sandbox_config!();
@@ -20,7 +20,7 @@ trace_test!(server, {
2020

2121
let client = sb.client();
2222

23-
client.send_to(msg.as_bytes(), &addr).await.unwrap();
23+
client.send_to(msg.as_bytes(), addr).await.unwrap();
2424
assert_eq!(
2525
msg,
2626
sb.timeout(100, server1_rx.recv())
@@ -49,7 +49,7 @@ trace_test!(client, {
4949

5050
let msg = "hello";
5151
tracing::debug!(%local_addr, "sending packet");
52-
client.send_to(msg.as_bytes(), &local_addr).await.unwrap();
52+
client.send_to(msg.as_bytes(), local_addr).await.unwrap();
5353
assert_eq!(msg, sb.timeout(100, dest_rx.recv()).await.unwrap(),);
5454
});
5555

@@ -71,7 +71,7 @@ trace_test!(with_filter, {
7171
let client = sb.client();
7272

7373
let msg = "hello";
74-
client.send_to(msg.as_bytes(), &local_addr).await.unwrap();
74+
client.send_to(msg.as_bytes(), local_addr).await.unwrap();
7575

7676
// search for the filter strings.
7777
let result = sb.timeout(100, rx.recv()).await.unwrap();
@@ -95,22 +95,25 @@ trace_test!(uring_receiver, {
9595

9696
let socket = sb.client();
9797
let (ws, addr) = sb.socket();
98-
99-
let pending_sends = net::queue(1).unwrap();
100-
101-
// we'll test a single DownstreamReceiveWorkerConfig
102-
proxy::packet_router::DownstreamReceiveWorkerConfig {
103-
worker_id: 1,
104-
port: addr.port(),
105-
config: config.clone(),
106-
buffer_pool: quilkin::test::BUFFER_POOL.clone(),
107-
sessions: proxy::SessionPool::new(
108-
config,
109-
vec![pending_sends.0.clone()],
110-
BUFFER_POOL.clone(),
111-
),
112-
}
113-
.spawn(pending_sends)
98+
let backend = quilkin::net::io::Backend::Polling;
99+
let pending_sends = quilkin::net::packet::queue(1, backend).unwrap();
100+
101+
// we'll test a single Listener
102+
quilkin::net::io::completion::listen(
103+
quilkin::net::io::Listener {
104+
worker_id: 1,
105+
port: addr.port(),
106+
config: config.clone(),
107+
buffer_pool: quilkin::test::BUFFER_POOL.clone(),
108+
sessions: quilkin::net::sessions::SessionPool::new(
109+
config,
110+
vec![pending_sends.0.clone()],
111+
BUFFER_POOL.clone(),
112+
backend,
113+
),
114+
},
115+
pending_sends,
116+
)
114117
.expect("failed to spawn task");
115118

116119
// Drop the socket, otherwise it can
@@ -140,31 +143,37 @@ trace_test!(
140143
.unwrap()
141144
.modify(|clusters| clusters.insert_default([endpoint.into()].into()));
142145

146+
let backend = quilkin::net::io::Backend::Polling;
143147
let pending_sends: Vec<_> = [
144-
net::queue(1).unwrap(),
145-
net::queue(1).unwrap(),
146-
net::queue(1).unwrap(),
148+
quilkin::net::packet::queue(1, backend).unwrap(),
149+
quilkin::net::packet::queue(1, backend).unwrap(),
150+
quilkin::net::packet::queue(1, backend).unwrap(),
147151
]
148152
.into_iter()
149153
.collect();
150154

151-
let sessions = proxy::SessionPool::new(
155+
let sessions = quilkin::net::sessions::SessionPool::new(
152156
config.clone(),
153157
pending_sends.iter().map(|ps| ps.0.clone()).collect(),
154158
BUFFER_POOL.clone(),
159+
backend,
155160
);
156161

157162
const WORKER_COUNT: usize = 3;
158163

159164
let (socket, addr) = sb.socket();
160-
proxy::packet_router::spawn_receivers(
161-
config,
162-
socket,
163-
pending_sends,
164-
&sessions,
165-
BUFFER_POOL.clone(),
166-
)
167-
.unwrap();
165+
let port = quilkin::net::io::SystemSocket::new(socket).port();
166+
for (worker_id, ws) in pending_sends.into_iter().enumerate() {
167+
let worker = quilkin::net::io::Listener {
168+
worker_id,
169+
port,
170+
config: config.clone(),
171+
sessions: sessions.clone(),
172+
buffer_pool: BUFFER_POOL.clone(),
173+
};
174+
175+
quilkin::net::io::poll::listen(worker, ws).unwrap();
176+
}
168177

169178
let socket = std::sync::Arc::new(sb.client());
170179
let msg = "recv-from";

crates/test/tests/xdp.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,10 @@
44
use qt::xdp_util::{endpoints, make_config};
55
use quilkin::{
66
filters,
7-
net::xdp::process::{
8-
self,
9-
xdp::{
10-
self,
11-
packet::net_types::{self as nt, UdpHeaders},
12-
slab::Slab,
13-
},
7+
net::io::nic::xdp::{
8+
packet::net_types::{self as nt, UdpHeaders},
9+
process,
10+
slab::Slab,
1411
},
1512
time::UtcTimestamp,
1613
};

src/config/providersv2.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,11 @@ impl Providers {
228228
self
229229
}
230230

231+
pub fn xds_endpoints(mut self, ns: Vec<tonic::transport::Endpoint>) -> Self {
232+
self.xds_endpoints = ns;
233+
self
234+
}
235+
231236
fn static_enabled(&self) -> bool {
232237
!self.endpoints.is_empty()
233238
}

src/net.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub mod phoenix;
5959

6060
pub(crate) mod error;
6161
pub(crate) mod maxmind_db;
62-
pub(crate) mod sessions;
62+
pub mod sessions;
6363

6464
pub use quilkin_xds as xds;
6565
pub use xds::net::TcpListener;

0 commit comments

Comments
 (0)