Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/xdp/integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ spec:
ports:
- containerPort: 7777
hostPort: 7777
args: ["proxy", "--to", "${server_ip}:8078", "--service.udp.xdp"]
args: ["--service.udp", "--provider.static.endpoints", "${server_ip}:8078", "--service.udp.xdp"]
securityContext:
capabilities:
add:
Expand Down
2 changes: 1 addition & 1 deletion .ci/xdp/veth-integ-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
ip netns del cs || true
ip netns del proxy || true

echo "::notice file=$source,line=$LINENO::Creating network namespaces"

Check notice on line 20 in .ci/xdp/veth-integ-test.sh

View workflow job for this annotation

GitHub Actions / xdp-integration

Creating network namespaces
ip netns add cs
ip netns add proxy

echo "::notice file=$source,line=$LINENO::Adding client <-> proxy <-> server links"

Check notice on line 24 in .ci/xdp/veth-integ-test.sh

View workflow job for this annotation

GitHub Actions / xdp-integration

Adding client <-> proxy <-> server links
ip link add veth-cs type veth peer name veth-proxy

ip link set veth-cs netns cs
Expand All @@ -30,11 +30,11 @@
PROXY_IP="10.0.0.2"
OUTSIDE_IP="10.0.0.1"

echo "::notice file=$source,line=$LINENO::Adding IPs"

Check notice on line 33 in .ci/xdp/veth-integ-test.sh

View workflow job for this annotation

GitHub Actions / xdp-integration

Adding IPs
ip -n cs addr add $OUTSIDE_IP/24 dev veth-cs
ip -n proxy addr add $PROXY_IP/24 dev veth-proxy

echo "::notice file=$source,line=$LINENO::Creating network namespaces"

Check notice on line 37 in .ci/xdp/veth-integ-test.sh

View workflow job for this annotation

GitHub Actions / xdp-integration

Creating network namespaces
ip -n cs link set veth-cs up
ip -n proxy link set veth-proxy up

Expand All @@ -46,9 +46,9 @@
ip -n cs link set veth-cs xdpgeneric obj "$ROOT/crates/xdp/bin/dummy.bin" sec xdp

ip netns exec cs fortio udp-echo&
ip netns exec proxy ./target/debug/quilkin --service.udp --service.qcmp proxy --to $OUTSIDE_IP:8078 --service.udp.xdp --service.udp.xdp.network-interface veth-proxy&
ip netns exec proxy ./target/debug/quilkin --service.udp --service.qcmp --provider.static.endpoints $OUTSIDE_IP:8078 --service.udp.xdp --service.udp.xdp.network-interface veth-proxy&

echo "::notice file=$source,line=$LINENO::Launching client"

Check notice on line 51 in .ci/xdp/veth-integ-test.sh

View workflow job for this annotation

GitHub Actions / xdp-integration

Launching client
ip netns exec cs fortio load -n 10 udp://$PROXY_IP:7777 2> ./target/logs.txt
logs=$(cat ./target/logs.txt)

Expand All @@ -60,7 +60,7 @@
# We could be more strict here and require they are exactly equal, but I can't
# even consistently get that on my local machine so I doubt CI will fair better
if [[ $recv -ne "0" ]]; then
echo "::notice file=$source,line=$LINENO::Successfully sent ${send}B and received ${recv}B"

Check notice on line 63 in .ci/xdp/veth-integ-test.sh

View workflow job for this annotation

GitHub Actions / xdp-integration

Successfully sent 240B and received 144B

# Now test QCMP pings which was also enabled in the proxy
ip netns exec cs ./target/debug/quilkin qcmp ping $PROXY_IP:7600
Expand Down
23 changes: 9 additions & 14 deletions benches/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ impl QuilkinLoop {
}

fn spinup_inner(port: u16, endpoint: SocketAddr) -> Self {
let (shutdown_tx, shutdown_rx) =
let (shutdown_tx, mut shutdown_rx) =
quilkin::signal::channel(quilkin::signal::ShutdownKind::Benching);

let thread = spawn("quilkin", move || {
Expand All @@ -331,21 +331,16 @@ impl QuilkinLoop {
);
});

let proxy = quilkin::cli::Proxy {
port,
qcmp_port: runtime
.block_on(quilkin::test::available_addr(
quilkin::test::AddressType::Random,
))
.port(),
..<_>::default()
};

runtime.block_on(async move {
proxy
.run(config, Default::default(), None, shutdown_rx)
.await
quilkin::cli::Service::default()
.udp()
.udp_port(port)
.udp_poll()
.qcmp()
.qcmp_port(quilkin::test::available_port())
.spawn_services(&config, &shutdown_rx)
.unwrap();
let _ = shutdown_rx.changed().await;
});
});

Expand Down
4 changes: 2 additions & 2 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ steps:
entrypoint: bash
args:
- "-c"
- 'timeout --signal=INT --preserve-status 5s docker run --rm -v "/workspace/examples/proxy.yaml:/etc/quilkin/quilkin.yaml" ${_REPOSITORY}quilkin:$(make version) proxy'
- 'timeout --signal=INT --preserve-status 5s docker run --rm -v "/workspace/examples/proxy.yaml:/etc/quilkin/quilkin.yaml" ${_REPOSITORY}quilkin:$(make version) --service.udp --provider.fs --provider.fs.path=/etc/quilkin/quilkin.yaml'
id: test-quilkin-image-default-config-file
waitFor:
- build
Expand All @@ -58,7 +58,7 @@ steps:
entrypoint: bash
args:
- "-c"
- 'timeout --signal=INT --preserve-status 5s docker run -v /tmp:/etc/quilkin/ --rm ${_REPOSITORY}quilkin:$(make version) proxy --to="127.0.0.1:0"'
- 'timeout --signal=INT --preserve-status 5s docker run -v /tmp:/etc/quilkin/ --rm ${_REPOSITORY}quilkin:$(make version) --service.udp --provider.static.endpoints="127.0.0.1:0"'
id: test-quilkin-image-command-line
waitFor:
- build
Expand Down
8 changes: 7 additions & 1 deletion crates/agones/src/pod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ mod tests {
let client = Client::new().await;

let pods: Api<Pod> = client.namespaced_api();
let cmds = ["proxy", "--to", "127.0.0.1:0"].map(String::from).to_vec();
let cmds = [
"--service.udp",
"--provider.static.endpoints",
"127.0.0.1:0",
]
.map(String::from)
.to_vec();
let pod = Pod {
metadata: ObjectMeta {
generate_name: Some("quilkin-".into()),
Expand Down
5 changes: 4 additions & 1 deletion crates/agones/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ mod tests {
// let's allocate this specific game server
let mut t = TestHelper::default();
let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await;
socket.send_to(b"ALLOCATE", gs_address).await.unwrap();
socket
.send_to(&b"ALLOCATE"[..], gs_address.parse().unwrap())
.await
.unwrap();

let response = timeout(SLOW, rx.recv())
.await
Expand Down
5 changes: 4 additions & 1 deletion crates/agones/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ mod tests {

let mut t = TestHelper::default();
let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await;
socket.send_to(b"ALLOCATE", gs_address).await.unwrap();
socket
.send_to(&b"ALLOCATE"[..], gs_address.parse().unwrap())
.await
.unwrap();

let response = timeout(Duration::from_secs(30), rx.recv())
.await
Expand Down
10 changes: 8 additions & 2 deletions crates/agones/src/sidecar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ mod tests {
let t = TestHelper::default();
let recv = t.open_socket_and_recv_single_packet().await;
let address = crate::gameserver_address(&gs);
recv.socket.send_to(b"hello", address).await.unwrap();
recv.socket
.send_to(&b"hello"[..], address.parse().unwrap())
.await
.unwrap();

let response = timeout(Duration::from_secs(30), recv.packet_rx)
.await
Expand Down Expand Up @@ -121,7 +124,10 @@ clusters:
let t = TestHelper::default();
let recv = t.open_socket_and_recv_single_packet().await;
let address = crate::gameserver_address(&gs);
recv.socket.send_to(b"hello", address).await.unwrap();
recv.socket
.send_to(&b"hello"[..], address.parse().unwrap())
.await
.unwrap();

let response = timeout(Duration::from_secs(30), recv.packet_rx)
.await
Expand Down
84 changes: 32 additions & 52 deletions crates/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use quilkin::{
test::TestConfig,
};
pub use serde_json::json;
use std::{net::SocketAddr, num::NonZeroUsize, path::PathBuf, sync::Arc};
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use tokio::sync::mpsc;

pub mod xdp_util;
Expand Down Expand Up @@ -266,13 +266,10 @@ impl Pail {
let pail = match spc.config {
PailConfig::Server(sspc) => {
let (packet_tx, packet_rx) = mpsc::channel::<String>(10);
let socket = quilkin::net::DualStackEpollSocket::new(0)
.expect("failed to create server socket");
let socket =
quilkin::net::Socket::polling_listen().expect("failed to create server socket");

let port = socket
.local_addr()
.expect("failed to bind server socket")
.port();
let port = socket.local_addr().port();

tracing::debug!(port, spc.name, "bound server socket");

Expand All @@ -285,7 +282,7 @@ impl Pail {

while num_packets > 0 {
let (size, _) = socket
.recv_from(&mut buf)
.recv_from(&mut *buf)
.await
.expect("failed to receive packet");
received += size;
Expand Down Expand Up @@ -393,10 +390,7 @@ impl Pail {
let (shutdown, shutdown_rx) =
quilkin::signal::channel(quilkin::signal::ShutdownKind::Testing);

let port = quilkin::net::socket_port(
&quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"),
);

let port = quilkin::test::available_port();
let config_path = path.clone();
let config = Arc::new(Config::default_agent());
config.dyn_cfg.id.store(Arc::new(spc.name.into()));
Expand Down Expand Up @@ -431,15 +425,6 @@ impl Pail {
})
}
PailConfig::Proxy(ppc) => {
let socket = quilkin::net::raw_socket_with_reuse(0).expect("failed to bind socket");
let qcmp =
quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket");
let qcmp_port = quilkin::net::socket_port(&qcmp);
let phoenix = TcpListener::bind(None).expect("failed to bind phoenix socket");
let phoenix_port = phoenix.port();

let port = quilkin::net::socket_port(&socket);

let management_servers = spc
.dependencies
.iter()
Expand Down Expand Up @@ -499,31 +484,26 @@ impl Pail {
}

config.dyn_cfg.id.store(Arc::new(spc.name.into()));
let pconfig = config.clone();

let (rttx, rtrx) = tokio::sync::mpsc::unbounded_channel();

let task = tokio::spawn(async move {
components::proxy::Proxy {
num_workers: NonZeroUsize::new(1).unwrap(),
management_servers,
socket: Some(socket),
qcmp,
phoenix,
notifier: Some(rttx),
..Default::default()
}
.run(
RunArgs {
config: pconfig,
ready: Default::default(),
shutdown_rx,
},
Some(tx),
)
.await
});

let qcmp_port = quilkin::test::available_port();
let phoenix_port = quilkin::test::available_port();
let port = quilkin::test::available_port();

let _provider_task = quilkin::config::providersv2::Providers::default()
.xds_endpoints(management_servers)
.spawn_providers(&config, <_>::default(), None);

let task = quilkin::cli::Service::default()
.udp()
.udp_port(port)
.udp_poll()
.qcmp()
.qcmp_port(qcmp_port)
.phoenix()
.phoenix_port(phoenix_port)
.spawn_services(&config, &shutdown_rx)
.unwrap();

let _ = tx.send(());
rx = Some(orx);

Self::Proxy(ProxyPail {
Expand All @@ -533,7 +513,7 @@ impl Pail {
shutdown,
task,
config,
delta_applies: Some(rtrx),
delta_applies: None,
})
}
};
Expand Down Expand Up @@ -682,20 +662,20 @@ impl Sandbox {

#[inline]
pub fn socket(&self) -> (socket2::Socket, SocketAddr) {
let socket = quilkin::net::raw_socket_with_reuse(0).unwrap();
let port = quilkin::net::socket_port(&socket);
let socket = quilkin::net::io::SystemSocket::listen().unwrap();
let port = socket.port();

(
socket,
socket.into_inner(),
SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, port)),
)
}

/// Creates an ephemeral socket that can be used to send messages to sandbox
/// pails
#[inline]
pub fn client(&self) -> quilkin::net::DualStackEpollSocket {
quilkin::net::DualStackEpollSocket::new(0).unwrap()
pub fn client(&self) -> quilkin::net::Socket {
quilkin::net::Socket::polling_listen().unwrap()
}

/// Sleeps for the specified number of milliseconds
Expand Down
2 changes: 1 addition & 1 deletion crates/test/src/xdp_util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use quilkin::net::xdp::process;
use quilkin::net::io::nic::xdp::process;
use xdp::{Packet, packet::net_types::UdpHeaders};

#[inline]
Expand Down
8 changes: 4 additions & 4 deletions crates/test/tests/mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ trace_test!(relay_routing, {
msg.extend_from_slice(&token.inner);

tracing::info!(%token, "sending packet");
client.send_to(&msg, &proxy_address).await.unwrap();
client.send_to(&*msg, proxy_address).await.unwrap();

assert_eq!(
"hello",
Expand All @@ -124,7 +124,7 @@ trace_test!(relay_routing, {
tracing::info!(%token, "sending bad packet");
// send an invalid packet
client
.send_to(b"hello\xFF\xFF\xFF", &proxy_address)
.send_to(&b"hello\xFF\xFF\xFF"[..], proxy_address)
.await
.unwrap();

Expand Down Expand Up @@ -308,7 +308,7 @@ trace_test!(filter_update, {
msg.extend_from_slice(&token);

tracing::info!(len = token.len(), "sending packet");
client.send_to(&msg, &proxy_address).await.unwrap();
client.send_to(&*msg, proxy_address).await.unwrap();

tracing::info!(len = token.len(), "received packet");
assert_eq!(
Expand All @@ -320,7 +320,7 @@ trace_test!(filter_update, {
// send an invalid packet
msg.truncate(5);
msg.extend((0..token.len()).map(|_| b'b'));
client.send_to(&msg, &proxy_address).await.unwrap();
client.send_to(msg, proxy_address).await.unwrap();

sandbox.expect_timeout(50, server_rx.recv()).await;
tracing::info!(len = token.len(), "didn't receive bad packet");
Expand Down
Loading
Loading