diff --git a/.ci/xdp/integration-test.sh b/.ci/xdp/integration-test.sh index 5868958c5d..8d1da464f8 100755 --- a/.ci/xdp/integration-test.sh +++ b/.ci/xdp/integration-test.sh @@ -34,7 +34,7 @@ spec: ports: - containerPort: 7777 hostPort: 7777 - args: ["proxy", "--to", "${server_ip}:8078", "--service.udp.xdp"] + args: ["--service.udp", "--provider.static.endpoints", "${server_ip}:8078", "--service.udp.xdp"] securityContext: capabilities: add: diff --git a/.ci/xdp/veth-integ-test.sh b/.ci/xdp/veth-integ-test.sh index dd4cf76185..34a5c8afcf 100755 --- a/.ci/xdp/veth-integ-test.sh +++ b/.ci/xdp/veth-integ-test.sh @@ -46,7 +46,7 @@ echo "Adding dummy program" ip -n cs link set veth-cs xdpgeneric obj "$ROOT/crates/xdp/bin/dummy.bin" sec xdp ip netns exec cs fortio udp-echo& -ip netns exec proxy ./target/debug/quilkin --service.udp --service.qcmp proxy --to $OUTSIDE_IP:8078 --service.udp.xdp --service.udp.xdp.network-interface veth-proxy& +ip netns exec proxy ./target/debug/quilkin --service.udp --service.qcmp --provider.static.endpoints $OUTSIDE_IP:8078 --service.udp.xdp --service.udp.xdp.network-interface veth-proxy& echo "::notice file=$source,line=$LINENO::Launching client" ip netns exec cs fortio load -n 10 udp://$PROXY_IP:7777 2> ./target/logs.txt diff --git a/benches/shared.rs b/benches/shared.rs index 368e046164..7843b03b56 100644 --- a/benches/shared.rs +++ b/benches/shared.rs @@ -319,7 +319,7 @@ impl QuilkinLoop { } fn spinup_inner(port: u16, endpoint: SocketAddr) -> Self { - let (shutdown_tx, shutdown_rx) = + let (shutdown_tx, mut shutdown_rx) = quilkin::signal::channel(quilkin::signal::ShutdownKind::Benching); let thread = spawn("quilkin", move || { @@ -331,21 +331,16 @@ impl QuilkinLoop { ); }); - let proxy = quilkin::cli::Proxy { - port, - qcmp_port: runtime - .block_on(quilkin::test::available_addr( - quilkin::test::AddressType::Random, - )) - .port(), - ..<_>::default() - }; - runtime.block_on(async move { - proxy - .run(config, Default::default(), None, shutdown_rx) - .await + quilkin::cli::Service::default() + .udp() + .udp_port(port) + .udp_poll() + .qcmp() + .qcmp_port(quilkin::test::available_port()) + .spawn_services(&config, &shutdown_rx) .unwrap(); + let _ = shutdown_rx.changed().await; }); }); diff --git a/cloudbuild.yaml b/cloudbuild.yaml index df3f0a0ab9..47ec3b8ca8 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -47,7 +47,7 @@ steps: entrypoint: bash args: - "-c" - - 'timeout --signal=INT --preserve-status 5s docker run --rm -v "/workspace/examples/proxy.yaml:/etc/quilkin/quilkin.yaml" ${_REPOSITORY}quilkin:$(make version) proxy' + - 'timeout --signal=INT --preserve-status 5s docker run --rm -v "/workspace/examples/proxy.yaml:/etc/quilkin/quilkin.yaml" ${_REPOSITORY}quilkin:$(make version) --service.udp --provider.fs --provider.fs.path=/etc/quilkin/quilkin.yaml' id: test-quilkin-image-default-config-file waitFor: - build @@ -58,7 +58,7 @@ steps: entrypoint: bash args: - "-c" - - 'timeout --signal=INT --preserve-status 5s docker run -v /tmp:/etc/quilkin/ --rm ${_REPOSITORY}quilkin:$(make version) proxy --to="127.0.0.1:0"' + - 'timeout --signal=INT --preserve-status 5s docker run -v /tmp:/etc/quilkin/ --rm ${_REPOSITORY}quilkin:$(make version) --service.udp --provider.static.endpoints="127.0.0.1:0"' id: test-quilkin-image-command-line waitFor: - build diff --git a/crates/agones/src/pod.rs b/crates/agones/src/pod.rs index f30abf7c72..d49060372b 100644 --- a/crates/agones/src/pod.rs +++ b/crates/agones/src/pod.rs @@ -31,7 +31,13 @@ mod tests { let client = Client::new().await; let pods: Api = client.namespaced_api(); - let cmds = ["proxy", "--to", "127.0.0.1:0"].map(String::from).to_vec(); + let cmds = [ + "--service.udp", + "--provider.static.endpoints", + "127.0.0.1:0", + ] + .map(String::from) + .to_vec(); let pod = Pod { metadata: ObjectMeta { generate_name: Some("quilkin-".into()), diff --git a/crates/agones/src/provider.rs b/crates/agones/src/provider.rs index 72275d6d41..a4fb1b1787 100644 --- a/crates/agones/src/provider.rs +++ b/crates/agones/src/provider.rs @@ -90,7 +90,10 @@ mod tests { // let's allocate this specific game server let mut t = TestHelper::default(); let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await; - socket.send_to(b"ALLOCATE", gs_address).await.unwrap(); + socket + .send_to(&b"ALLOCATE"[..], gs_address.parse().unwrap()) + .await + .unwrap(); let response = timeout(SLOW, rx.recv()) .await diff --git a/crates/agones/src/relay.rs b/crates/agones/src/relay.rs index d638303373..6207a1194f 100644 --- a/crates/agones/src/relay.rs +++ b/crates/agones/src/relay.rs @@ -100,7 +100,10 @@ mod tests { let mut t = TestHelper::default(); let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await; - socket.send_to(b"ALLOCATE", gs_address).await.unwrap(); + socket + .send_to(&b"ALLOCATE"[..], gs_address.parse().unwrap()) + .await + .unwrap(); let response = timeout(Duration::from_secs(30), rx.recv()) .await diff --git a/crates/agones/src/sidecar.rs b/crates/agones/src/sidecar.rs index 2f9275f409..964c3c9f7f 100644 --- a/crates/agones/src/sidecar.rs +++ b/crates/agones/src/sidecar.rs @@ -47,7 +47,10 @@ mod tests { let t = TestHelper::default(); let recv = t.open_socket_and_recv_single_packet().await; let address = crate::gameserver_address(&gs); - recv.socket.send_to(b"hello", address).await.unwrap(); + recv.socket + .send_to(&b"hello"[..], address.parse().unwrap()) + .await + .unwrap(); let response = timeout(Duration::from_secs(30), recv.packet_rx) .await @@ -121,7 +124,10 @@ clusters: let t = TestHelper::default(); let recv = t.open_socket_and_recv_single_packet().await; let address = crate::gameserver_address(&gs); - recv.socket.send_to(b"hello", address).await.unwrap(); + recv.socket + .send_to(&b"hello"[..], address.parse().unwrap()) + .await + .unwrap(); let response = timeout(Duration::from_secs(30), recv.packet_rx) .await diff --git a/crates/test/src/lib.rs b/crates/test/src/lib.rs index 9794c27c63..5bfd868858 100644 --- a/crates/test/src/lib.rs +++ b/crates/test/src/lib.rs @@ -8,7 +8,7 @@ use quilkin::{ test::TestConfig, }; pub use serde_json::json; -use std::{net::SocketAddr, num::NonZeroUsize, path::PathBuf, sync::Arc}; +use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use tokio::sync::mpsc; pub mod xdp_util; @@ -266,13 +266,10 @@ impl Pail { let pail = match spc.config { PailConfig::Server(sspc) => { let (packet_tx, packet_rx) = mpsc::channel::(10); - let socket = quilkin::net::DualStackEpollSocket::new(0) - .expect("failed to create server socket"); + let socket = + quilkin::net::Socket::polling_listen().expect("failed to create server socket"); - let port = socket - .local_addr() - .expect("failed to bind server socket") - .port(); + let port = socket.local_addr().port(); tracing::debug!(port, spc.name, "bound server socket"); @@ -285,7 +282,7 @@ impl Pail { while num_packets > 0 { let (size, _) = socket - .recv_from(&mut buf) + .recv_from(&mut *buf) .await .expect("failed to receive packet"); received += size; @@ -393,10 +390,7 @@ impl Pail { let (shutdown, shutdown_rx) = quilkin::signal::channel(quilkin::signal::ShutdownKind::Testing); - let port = quilkin::net::socket_port( - &quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"), - ); - + let port = quilkin::test::available_port(); let config_path = path.clone(); let config = Arc::new(Config::default_agent()); config.dyn_cfg.id.store(Arc::new(spc.name.into())); @@ -431,15 +425,6 @@ impl Pail { }) } PailConfig::Proxy(ppc) => { - let socket = quilkin::net::raw_socket_with_reuse(0).expect("failed to bind socket"); - let qcmp = - quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"); - let qcmp_port = quilkin::net::socket_port(&qcmp); - let phoenix = TcpListener::bind(None).expect("failed to bind phoenix socket"); - let phoenix_port = phoenix.port(); - - let port = quilkin::net::socket_port(&socket); - let management_servers = spc .dependencies .iter() @@ -499,31 +484,26 @@ impl Pail { } config.dyn_cfg.id.store(Arc::new(spc.name.into())); - let pconfig = config.clone(); - - let (rttx, rtrx) = tokio::sync::mpsc::unbounded_channel(); - - let task = tokio::spawn(async move { - components::proxy::Proxy { - num_workers: NonZeroUsize::new(1).unwrap(), - management_servers, - socket: Some(socket), - qcmp, - phoenix, - notifier: Some(rttx), - ..Default::default() - } - .run( - RunArgs { - config: pconfig, - ready: Default::default(), - shutdown_rx, - }, - Some(tx), - ) - .await - }); - + let qcmp_port = quilkin::test::available_port(); + let phoenix_port = quilkin::test::available_port(); + let port = quilkin::test::available_port(); + + let _provider_task = quilkin::config::providersv2::Providers::default() + .xds_endpoints(management_servers) + .spawn_providers(&config, <_>::default(), None); + + let task = quilkin::cli::Service::default() + .udp() + .udp_port(port) + .udp_poll() + .qcmp() + .qcmp_port(qcmp_port) + .phoenix() + .phoenix_port(phoenix_port) + .spawn_services(&config, &shutdown_rx) + .unwrap(); + + let _ = tx.send(()); rx = Some(orx); Self::Proxy(ProxyPail { @@ -533,7 +513,7 @@ impl Pail { shutdown, task, config, - delta_applies: Some(rtrx), + delta_applies: None, }) } }; @@ -682,11 +662,11 @@ impl Sandbox { #[inline] pub fn socket(&self) -> (socket2::Socket, SocketAddr) { - let socket = quilkin::net::raw_socket_with_reuse(0).unwrap(); - let port = quilkin::net::socket_port(&socket); + let socket = quilkin::net::io::SystemSocket::listen().unwrap(); + let port = socket.port(); ( - socket, + socket.into_inner(), SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, port)), ) } @@ -694,8 +674,8 @@ impl Sandbox { /// Creates an ephemeral socket that can be used to send messages to sandbox /// pails #[inline] - pub fn client(&self) -> quilkin::net::DualStackEpollSocket { - quilkin::net::DualStackEpollSocket::new(0).unwrap() + pub fn client(&self) -> quilkin::net::Socket { + quilkin::net::Socket::polling_listen().unwrap() } /// Sleeps for the specified number of milliseconds diff --git a/crates/test/src/xdp_util.rs b/crates/test/src/xdp_util.rs index 9b830be58e..96e3565cf8 100644 --- a/crates/test/src/xdp_util.rs +++ b/crates/test/src/xdp_util.rs @@ -1,4 +1,4 @@ -use quilkin::net::xdp::process; +use quilkin::net::io::nic::xdp::process; use xdp::{Packet, packet::net_types::UdpHeaders}; #[inline] diff --git a/crates/test/tests/mesh.rs b/crates/test/tests/mesh.rs index 97808734f6..15f21d6699 100644 --- a/crates/test/tests/mesh.rs +++ b/crates/test/tests/mesh.rs @@ -112,7 +112,7 @@ trace_test!(relay_routing, { msg.extend_from_slice(&token.inner); tracing::info!(%token, "sending packet"); - client.send_to(&msg, &proxy_address).await.unwrap(); + client.send_to(&*msg, proxy_address).await.unwrap(); assert_eq!( "hello", @@ -124,7 +124,7 @@ trace_test!(relay_routing, { tracing::info!(%token, "sending bad packet"); // send an invalid packet client - .send_to(b"hello\xFF\xFF\xFF", &proxy_address) + .send_to(&b"hello\xFF\xFF\xFF"[..], proxy_address) .await .unwrap(); @@ -308,7 +308,7 @@ trace_test!(filter_update, { msg.extend_from_slice(&token); tracing::info!(len = token.len(), "sending packet"); - client.send_to(&msg, &proxy_address).await.unwrap(); + client.send_to(&*msg, proxy_address).await.unwrap(); tracing::info!(len = token.len(), "received packet"); assert_eq!( @@ -320,7 +320,7 @@ trace_test!(filter_update, { // send an invalid packet msg.truncate(5); msg.extend((0..token.len()).map(|_| b'b')); - client.send_to(&msg, &proxy_address).await.unwrap(); + client.send_to(msg, proxy_address).await.unwrap(); sandbox.expect_timeout(50, server_rx.recv()).await; tracing::info!(len = token.len(), "didn't receive bad packet"); diff --git a/crates/test/tests/proxy.rs b/crates/test/tests/proxy.rs index cb44f74ebd..2882c3ad1a 100644 --- a/crates/test/tests/proxy.rs +++ b/crates/test/tests/proxy.rs @@ -3,7 +3,7 @@ #![cfg(debug_assertions)] use qt::*; -use quilkin::{components::proxy, net, test::TestConfig}; +use quilkin::test::TestConfig; trace_test!(server, { let mut sc = qt::sandbox_config!(); @@ -24,7 +24,7 @@ trace_test!(server, { let client = sb.client(); - client.send_to(msg.as_bytes(), &addr).await.unwrap(); + client.send_to(msg.as_bytes(), addr).await.unwrap(); assert_eq!( msg, sb.timeout(100, server1_rx.recv()) @@ -53,7 +53,7 @@ trace_test!(client, { let msg = "hello"; tracing::debug!(%local_addr, "sending packet"); - client.send_to(msg.as_bytes(), &local_addr).await.unwrap(); + client.send_to(msg.as_bytes(), local_addr).await.unwrap(); assert_eq!(msg, sb.timeout(100, dest_rx.recv()).await.unwrap(),); }); @@ -75,7 +75,7 @@ trace_test!(with_filter, { let client = sb.client(); let msg = "hello"; - client.send_to(msg.as_bytes(), &local_addr).await.unwrap(); + client.send_to(msg.as_bytes(), local_addr).await.unwrap(); // search for the filter strings. let result = sb.timeout(100, rx.recv()).await.unwrap(); @@ -99,22 +99,25 @@ trace_test!(uring_receiver, { let socket = sb.client(); let (ws, addr) = sb.socket(); - - let pending_sends = net::queue(1).unwrap(); - - // we'll test a single DownstreamReceiveWorkerConfig - proxy::packet_router::DownstreamReceiveWorkerConfig { - worker_id: 1, - port: addr.port(), - config: config.clone(), - buffer_pool: quilkin::test::BUFFER_POOL.clone(), - sessions: proxy::SessionPool::new( - config, - vec![pending_sends.0.clone()], - BUFFER_POOL.clone(), - ), - } - .spawn(pending_sends) + let backend = quilkin::net::io::Backend::Polling; + let pending_sends = quilkin::net::packet::queue(1, backend).unwrap(); + + // we'll test a single Listener + quilkin::net::io::completion::listen( + quilkin::net::io::Listener { + worker_id: 1, + port: addr.port(), + config: config.clone(), + buffer_pool: quilkin::test::BUFFER_POOL.clone(), + sessions: quilkin::net::sessions::SessionPool::new( + config, + vec![pending_sends.0.clone()], + BUFFER_POOL.clone(), + backend, + ), + }, + pending_sends, + ) .expect("failed to spawn task"); // Drop the socket, otherwise it can @@ -144,31 +147,37 @@ trace_test!( .unwrap() .modify(|clusters| clusters.insert_default([endpoint.into()].into())); + let backend = quilkin::net::io::Backend::Polling; let pending_sends: Vec<_> = [ - net::queue(1).unwrap(), - net::queue(1).unwrap(), - net::queue(1).unwrap(), + quilkin::net::packet::queue(1, backend).unwrap(), + quilkin::net::packet::queue(1, backend).unwrap(), + quilkin::net::packet::queue(1, backend).unwrap(), ] .into_iter() .collect(); - let sessions = proxy::SessionPool::new( + let sessions = quilkin::net::sessions::SessionPool::new( config.clone(), pending_sends.iter().map(|ps| ps.0.clone()).collect(), BUFFER_POOL.clone(), + backend, ); const WORKER_COUNT: usize = 3; let (socket, addr) = sb.socket(); - proxy::packet_router::spawn_receivers( - config, - socket, - pending_sends, - &sessions, - BUFFER_POOL.clone(), - ) - .unwrap(); + let port = quilkin::net::io::SystemSocket::new(socket).port(); + for (worker_id, ws) in pending_sends.into_iter().enumerate() { + let worker = quilkin::net::io::Listener { + worker_id, + port, + config: config.clone(), + sessions: sessions.clone(), + buffer_pool: BUFFER_POOL.clone(), + }; + + quilkin::net::io::poll::listen(worker, ws).unwrap(); + } let socket = std::sync::Arc::new(sb.client()); let msg = "recv-from"; diff --git a/crates/test/tests/xdp.rs b/crates/test/tests/xdp.rs index 9bcb196f61..1aae1010a4 100644 --- a/crates/test/tests/xdp.rs +++ b/crates/test/tests/xdp.rs @@ -4,13 +4,10 @@ use qt::xdp_util::{endpoints, make_config}; use quilkin::{ filters, - net::xdp::process::{ - self, - xdp::{ - self, - packet::net_types::{self as nt, UdpHeaders}, - slab::Slab, - }, + net::io::nic::xdp::{ + packet::net_types::{self as nt, UdpHeaders}, + process, + slab::Slab, }, time::UtcTimestamp, }; diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index a4020f8f38..6e7f693f88 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -30,11 +30,14 @@ --- +- [Providers]() + - [Agones](./config/providers/agones.md) + - [Filesystem](./config/providers/filesystem.md) + +--- + - [Control Plane](./services/xds.md) - [Metrics](./services/xds/metrics.md) - - [Providers]() - - [Agones](./services/xds/providers/agones.md) - - [Filesystem](./services/xds/providers/filesystem.md) - [Protobuf Reference](./services/xds/proto/index.md) --- diff --git a/docs/src/config/providers/agones.md b/docs/src/config/providers/agones.md new file mode 100644 index 0000000000..05392aeb2e --- /dev/null +++ b/docs/src/config/providers/agones.md @@ -0,0 +1,83 @@ +# Agones xDS Provider + +The [Agones] xDS Provider is built to simplify Quilkin integration with Agones +game server hosting on top of [Kubernetes](https://kubernetes.io). + +This provider watches for changes in Agones +[`GameServer` resources](https://agones.dev/site/docs/getting-started/create-gameserver/) in a cluster, and +utilises that information to provide [Endpoint][Endpoints] information to connected Quilkin proxies. + +To view all the options for the agones provider subcommand, run: +```shell +$ quilkin manage agones --help +{{#include ../../../../../target/quilkin.manage.agones.commands}} +``` + +> Currently, the Agones provider can only discover resources within the cluster it is running in. + +## Endpoint Configuration + +This provider watches the Kubernetes clusters for `Allocated` +[Agones GameServers](https://agones.dev/site/docs/reference/gameserver/#gameserver-state-diagram) +and exposes their IP address and Port as [Endpoints] to any connected Quilkin proxies. + +> Since an Agones GameServer can have multiple ports exposed, if multiple ports are in +> use, the server will pick the first port in the port list. + +By default the Agones xDS provider will look in the `default` namespace for any `GameServer` resources, but it can be +configured via the `--gameservers-namespace` argument. + +### Access Tokens + +The set of [access tokens](../../proxy.md#specialist-endpoint-metadata) for the associated Endpoint can be +set by adding a comma separated standard base64 encoded strings. This must be added under an annotation +`quilkin.dev/tokens` in the +[GameServer](https://agones.dev/site/docs/reference/agones_crd_api_reference/#agones.dev/v1.GameServer)'s metadata. + +For example: + +```yaml +annotations: + # Sets two tokens for the corresponding endpoint with values 1x7ijy6 and 8gj3v2i respectively. + quilkin.dev/tokens: MXg3aWp5Ng==,OGdqM3YyaQ== +``` + +## Filter Configuration + +The Agones provider watches for a singular [`ConfigMap`](https://kubernetes.io/docs/concepts/configuration/configmap/) +that has the label of `quilkin.dev/configmap: "true"`, and any changes that happen to it, and use its contents to +send [Filter] configuration to any connected Quilkin proxies. + +The `ConfigMap` contents should be a valid Quilkin [file configuration][configuration], but with no +Endpoint data. + +For example: + +```yaml +{{#include ../../../../../examples/agones-xonotic-xds/xds-control-plane.yaml:config-map}} +``` + +By default the Agones xDS provider will look in the `default` namespace for this `ConfigMap`, but it can be +configured via the `--config-namespace` argument. + +## Usage + +As an example, the following runs the server with subcommnad `manage agones` against a cluster (using default +kubeconfig authentication) where Quilkin pods run in the `quilkin` namespace and `GameServer` pods run in the +`gameservers` namespace: + +```sh +quilkin manage agones --config-namespace quilkin --gameservers-namespace gameservers +``` + +For a full referenmce of deploying this provider in a Kubernetes cluster, with appropriate [Deployments], [Services], +and [RBAC] Rules, there is an [Agones, xDS and Xonotic example][example]. + +[Agones]: https://agones.dev +[Endpoints]: ../../proxy.md#endpoints +[Deployments]: https://kubernetes.io/docs/concepts/workloads/controllers/deployment/ +[Services]: https://kubernetes.io/docs/concepts/services-networking/service/ +[RBAC]: https://kubernetes.io/docs/reference/access-authn-authz/rbac/ +[example]: https://github.com/googleforgames/quilkin/tree/{{GITHUB_REF_NAME}}/examples/agones-xonotic-xds +[Filter]: ../../../services/proxy/filters.md +[configuration]: ../../../services/proxy/configuration.md \ No newline at end of file diff --git a/docs/src/config/providers/filesystem.md b/docs/src/config/providers/filesystem.md new file mode 100644 index 0000000000..c5d417e2a3 --- /dev/null +++ b/docs/src/config/providers/filesystem.md @@ -0,0 +1,45 @@ +# Filesystem xDS Provider + +The filesystem provider watches a configuration file on disk and sends updates to proxies whenever that file changes. + +To view all the options for the file provider subcommand, run: +```shell +$ quilkin manage agones --help +{{#include ../../../../../target/quilkin.manage.file.commands}} +``` + +For example: +```sh +quilkin manage file quilkin.yaml +``` + +We run this on port 1800, in this example, in case you are running this locally, and the +default port is taken up by an existing Quilkin proxy. + +After running this command, any proxy that connects to port 18000 will receive updates as configured in `config.yaml` +file. + +You can find the configuration file schema in [Configuration File][configuration]. + +Example: + +```rust +# let yaml = " +version: v1alpha1 +filters: + - name: quilkin.filters.debug.v1alpha1.Debug + config: + id: hello +clusters: + - endpoints: + - address: 123.0.0.1:29 + metadata: + 'quilkin.dev': + tokens: + - 'MXg3aWp5Ng==' +# "; +# let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); +# assert_eq!(config.filters().unwrap().load().len(), 1); +``` + +[configuration]: ../../../services/proxy/configuration.md diff --git a/docs/src/deployment/quickstarts/netcat.md b/docs/src/deployment/quickstarts/netcat.md index 399cb1476f..b11d617961 100644 --- a/docs/src/deployment/quickstarts/netcat.md +++ b/docs/src/deployment/quickstarts/netcat.md @@ -25,7 +25,7 @@ Next let's configure Quilkin in proxy mode, with a static configuration that poi UDP echo service we just started. ```shell -quilkin proxy --to 127.0.0.1:8080 +quilkin --service.udp --provider.static.endpoints 127.0.0.1:8080 ``` This configuration will start Quilkin on the [default proxy port](../../services/proxy.md), and it will diff --git a/docs/src/services/proxy/configuration.md b/docs/src/services/proxy/configuration.md index e29ca35f44..8b1d919270 100644 --- a/docs/src/services/proxy/configuration.md +++ b/docs/src/services/proxy/configuration.md @@ -22,7 +22,7 @@ endpoint configuration to specify two endpoints with `token` metadata attached t {{#include ../../../../examples/proxy.yaml:17:100}} ``` -This is a great use of a static configuration file, as we only get a singular `--to` endpoint address via the +This is a great use of a static configuration file, as we only get a singular `--provider.static.endpoints` endpoint address via the command line arguments. We can also configure [Filters](./filters.md) via the configuration file. See that section for documentation. diff --git a/examples/agones-xonotic-sidecar/sidecar.yaml b/examples/agones-xonotic-sidecar/sidecar.yaml index a09addebc7..4ad402a7a6 100644 --- a/examples/agones-xonotic-sidecar/sidecar.yaml +++ b/examples/agones-xonotic-sidecar/sidecar.yaml @@ -42,7 +42,10 @@ spec: image: us-docker.pkg.dev/agones-images/examples/xonotic-example:1.2 - name: quilkin image: us-docker.pkg.dev/quilkin/release/quilkin:0.9.0 - args: ["proxy", "--port", "26001", "--to", "127.0.0.1:26000"] + args: + - --service.udp + - --service.udp.port=26001 + - --provider.static.endpoints=127.0.0.1:26000 livenessProbe: httpGet: path: /live diff --git a/src/cli.rs b/src/cli.rs index f13f9fd7ad..b8e6232a2a 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -26,10 +26,9 @@ pub use self::{ agent::Agent, generate_config_schema::GenerateConfigSchema, manage::Manage, - proxy::Proxy, qcmp::Qcmp, relay::Relay, - service::{Finalizer, Service}, + service::{Finalizer, Service, XdpOptions}, }; macro_rules! define_port { @@ -45,7 +44,6 @@ macro_rules! define_port { pub mod agent; pub mod generate_config_schema; pub mod manage; -pub mod proxy; pub mod qcmp; pub mod relay; mod service; @@ -191,7 +189,6 @@ pub enum Commands { Manage(Manage), #[clap(subcommand)] Qcmp(Qcmp), - Proxy(Proxy), Relay(Relay), } @@ -261,14 +258,6 @@ impl Cli { agent.run(locality, config, old_ready, shutdown_rx).await } - Some(Commands::Proxy(runner)) => { - let old_ready = proxy::Ready { - xds_is_healthy: parking_lot::RwLock::from(Some(ready.clone())).into(), - ..<_>::default() - }; - runner.run(config, old_ready, None, shutdown_rx).await - } - Some(Commands::Manage(manager)) => { let old_ready = agent::Ready { provider_is_healthy: ready.clone(), diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs deleted file mode 100644 index 762e7d9623..0000000000 --- a/src/cli/proxy.rs +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::net::SocketAddr; -use tonic::transport::Endpoint; - -#[cfg(doc)] -use crate::filters::FilterFactory; - -use crate::signal::ShutdownRx; - -pub use crate::{cli::service::XdpOptions, components::proxy::Ready}; - -define_port!(7777); - -const QCMP_PORT: u16 = 7600; - -/// Run Quilkin as a UDP reverse proxy. -#[derive(clap::Args, Clone, Debug)] -pub struct Proxy { - /// One or more `quilkin manage` endpoints to listen to for config changes - #[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER", conflicts_with("to"))] - pub management_server: Vec, - /// The remote URL or local file path to retrieve the Maxmind database. - #[clap(long, env)] - pub mmdb: Option, - /// The port to listen on. - #[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = PORT)] - pub port: u16, - /// The port to listen on. - #[clap(short, long, env = "QUILKIN_QCMP_PORT", default_value_t = QCMP_PORT)] - pub qcmp_port: u16, - /// One or more socket addresses to forward packets to. - #[clap(long, env = "QUILKIN_DEST")] - pub to: Vec, - /// Assigns dynamic tokens to each address in the `--to` argument - /// - /// Format is `:` - #[clap(long, env = "QUILKIN_DEST_TOKENS", requires("to"))] - pub to_tokens: Option, - /// The interval in seconds at which the relay will send a discovery request - /// to an management server after receiving no updates. - #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS")] - pub idle_request_interval_secs: Option, - /// Number of worker threads used to process packets. - /// - /// If not specified defaults to number of cpus. Has no effect if XDP is used, - /// as the number of workers is always the same as the NIC queue size. - #[clap(short, long, env = "QUILKIN_WORKERS")] - pub workers: Option, - #[clap(flatten)] - pub xdp_opts: XdpOptions, - #[clap(long = "termination-timeout")] - pub termination_timeout: Option, -} - -impl Default for Proxy { - fn default() -> Self { - Self { - management_server: <_>::default(), - mmdb: <_>::default(), - port: PORT, - qcmp_port: QCMP_PORT, - to: <_>::default(), - to_tokens: None, - idle_request_interval_secs: None, - workers: None, - xdp_opts: Default::default(), - termination_timeout: None, - } - } -} - -impl Proxy { - /// Start and run a proxy. - #[tracing::instrument(skip_all)] - pub async fn run( - self, - config: std::sync::Arc, - ready: Ready, - initialized: Option>, - shutdown_rx: ShutdownRx, - ) -> crate::Result<()> { - tracing::info!(port = self.port, proxy_id = config.id(), "Starting proxy"); - - // The number of worker tasks to spawn. Each task gets a dedicated queue to - // consume packets off. - let num_workers = self.workers.unwrap_or_else(|| { - std::num::NonZeroUsize::new(num_cpus::get()) - .expect("num_cpus returned 0, which should be impossible") - }); - - let socket = crate::net::raw_socket_with_reuse(self.port)?; - let qcmp = crate::net::raw_socket_with_reuse(self.qcmp_port)?; - let phoenix = crate::net::TcpListener::bind(Some(self.qcmp_port))?; - - let to_tokens = self - .to_tokens - .map(|tt| { - let Some((count, length)) = tt.split_once(':') else { - eyre::bail!("--to-tokens `{tt}` is invalid, it must have a `:` separator") - }; - - let count = count.parse()?; - let length = length.parse()?; - - Ok(crate::components::proxy::ToTokens { count, length }) - }) - .transpose()?; - - crate::components::proxy::Proxy { - management_servers: self.management_server, - mmdb: self.mmdb, - to: self.to, - to_tokens, - num_workers, - socket: Some(socket), - qcmp, - phoenix, - notifier: None, - xdp: self.xdp_opts, - termination_timeout: self.termination_timeout, - } - .run( - crate::components::RunArgs { - config, - ready, - shutdown_rx, - }, - initialized, - ) - .await - } -} diff --git a/src/cli/service.rs b/src/cli/service.rs index 5101ab908d..8834e84731 100644 --- a/src/cli/service.rs +++ b/src/cli/service.rs @@ -1,6 +1,6 @@ use std::{future::Future, sync::Arc}; -use crate::{components::proxy::SessionPool, config::Config}; +use crate::{config::Config, net::io::Backend}; #[derive(Debug, clap::Parser)] #[command(next_help_heading = "Service Options")] @@ -64,6 +64,22 @@ pub struct Service { default_value_t = 7777 )] udp_port: u16, + /// Requires Quilkin to use a polling based I/O interface (e.g. epoll). + #[clap( + long = "service.udp.poll", + env = "QUILKIN_SERVICE_UDP_POLL", + conflicts_with("force_xdp"), + conflicts_with("udp_completion_enabled") + )] + udp_poll_enabled: bool, + /// Requires Quilkin to use a completion based I/O interface (e.g. io-uring). + #[clap( + long = "service.udp.completion", + env = "QUILKIN_SERVICE_UDP_COMPLETION", + conflicts_with("force_xdp"), + conflicts_with("udp_poll_enabled") + )] + udp_completion_enabled: bool, #[clap(flatten)] pub xdp: XdpOptions, /// Amount of UDP workers to run. @@ -130,6 +146,8 @@ impl Default for Service { qcmp_enabled: <_>::default(), qcmp_port: 7600, udp_enabled: <_>::default(), + udp_poll_enabled: <_>::default(), + udp_completion_enabled: <_>::default(), udp_port: 7777, udp_workers: std::num::NonZeroUsize::new(num_cpus::get()).unwrap(), xds_enabled: <_>::default(), @@ -157,6 +175,20 @@ impl Service { self } + pub fn udp_poll(mut self) -> Self { + self.udp_poll_enabled = true; + self + } + + pub fn udp_completion(mut self) -> Self { + self.udp_completion_enabled = true; + self + } + + pub fn get_udp_port(&self) -> u16 { + self.udp_port + } + /// Enables the QCMP service. pub fn qcmp(mut self) -> Self { self.qcmp_enabled = true; @@ -249,19 +281,23 @@ impl Service { let mut shutdown_rx = shutdown_rx.clone(); let mds_task = self.publish_mds(config)?; let (phoenix_task, phoenix_finalizer) = self.publish_phoenix(config)?; - // We need to call this before qcmp since if we use XDP we handle QCMP - // internally without a separate task - let (udp_task, finalizer, session_pool) = self.publish_udp(config)?; - let qcmp_task = self.publish_qcmp(&shutdown_rx)?; + let (udp_task, finalizer, session_pool) = self.listen_udp(config, &shutdown_rx)?; let xds_task = self.publish_xds(config)?; + tracing::info!(services=?[ + self.udp_enabled.then_some("udp"), + self.qcmp_enabled.then_some("qcmp"), + self.phoenix_enabled.then_some("phoenix"), + self.xds_enabled.then_some("xds"), + self.mds_enabled.then_some("mds"), + ].into_iter().flatten().collect::>(), "starting service listeners"); + Ok(tokio::spawn(async move { tokio::task::spawn(async move { let (task, result) = tokio::select! { result = mds_task => ("mds", result), result = phoenix_task => ("phoenix", result), - result = qcmp_task => ("qcmp", result), - result = udp_task => ("udp", result), + result = udp_task => ("udp/qcmp", result), result = xds_task => ("xds", result), }; @@ -341,20 +377,6 @@ impl Service { Ok((std::future::pending(), None)) } - /// Spawns an QCMP server if enabled, otherwise returns a future which never completes. - fn publish_qcmp( - &self, - shutdown_rx: &crate::signal::ShutdownRx, - ) -> crate::Result> + use<>> { - if self.qcmp_enabled { - tracing::info!(port=%self.qcmp_port, "starting qcmp service"); - let qcmp = crate::net::raw_socket_with_reuse(self.qcmp_port)?; - crate::codec::qcmp::spawn(qcmp, shutdown_rx.clone())?; - } - - Ok(std::future::pending()) - } - /// Spawns an xDS server if enabled, otherwise returns a future which never completes. fn publish_mds( &self, @@ -409,178 +431,45 @@ impl Service { } #[allow(clippy::type_complexity)] - pub fn publish_udp( + pub fn listen_udp( &mut self, config: &Arc, + shutdown_rx: &crate::signal::ShutdownRx, ) -> eyre::Result<( impl Future> + use<>, Option, - Option>, + Option>, )> { if !self.udp_enabled && !self.qcmp_enabled { return Ok((either::Left(std::future::pending()), None, None)); } - tracing::info!(port=%self.udp_port, "starting udp service"); - - #[cfg(target_os = "linux")] - { - match self.spawn_xdp(config.clone(), self.xdp.force_xdp) { - Ok(xdp) => { - if let Some(xdp) = xdp { - self.qcmp_enabled = false; - return Ok((either::Left(std::future::pending()), Some(xdp), None)); - } else if self.xdp.force_xdp { - eyre::bail!("XDP was forced on, but failed to initialize"); - } - } - Err(err) => { - if self.xdp.force_xdp { - return Err(err); - } - - tracing::warn!( - ?err, - "failed to spawn XDP I/O loop, falling back to io-uring" - ); - } - } - } - if !self.udp_enabled { return Ok((either::Left(std::future::pending()), None, None)); } - self.spawn_user_space_router(config.clone()) - .map(|(fut, func, sp)| (either::Right(fut), Some(func), Some(sp))) - } - - /// Launches the user space implementation of the packet router using - /// sockets. This implementation uses a pool of buffers and sockets to - /// manage UDP sessions and sockets. On Linux this will use io-uring, where - /// as it will use epoll interfaces on non-Linux platforms. - #[allow(clippy::type_complexity)] - pub fn spawn_user_space_router( - &self, - config: Arc, - ) -> crate::Result<( - impl Future> + use<>, - Finalizer, - Arc, - )> { - // If we're on linux, we're using io-uring, but we're probably running in a container - // and may not be allowed to call io-uring related syscalls due to seccomp - // profiles, so do a quick check here to validate that we can call io_uring_setup - // https://www.man7.org/linux/man-pages/man2/io_uring_setup.2.html - #[cfg(target_os = "linux")] - { - if let Err(err) = io_uring::IoUring::new(2) { - fn in_container() -> bool { - let sched = match std::fs::read_to_string("/proc/1/sched") { - Ok(s) => s, - Err(error) => { - tracing::warn!( - %error, - "unable to read /proc/1/sched to determine if quilkin is in a container" - ); - return false; - } - }; - let Some(line) = sched.lines().next() else { - tracing::warn!("/proc/1/sched was empty"); - return false; - }; - let Some(proc) = line.split(' ').next() else { - tracing::warn!("first line of /proc/1/sched was empty"); - return false; - }; - proc != "init" && proc != "systemd" - } - - if err.kind() == std::io::ErrorKind::PermissionDenied && in_container() { - eyre::bail!( - "failed to call `io_uring_setup` due to EPERM ({err}), quilkin seems to be running inside a container meaning this is likely due to the seccomp profile not allowing the syscall" - ); - } else { - eyre::bail!("failed to call `io_uring_setup` due to {err}"); - } - } - } - - let socket = crate::net::raw_socket_with_reuse(self.udp_port)?; - let workers = self.udp_workers.get(); - let buffer_pool = Arc::new(crate::collections::BufferPool::new(workers, 2 * 1024)); - - let mut worker_sends = Vec::with_capacity(workers); - let mut session_sends = Vec::with_capacity(workers); - for _ in 0..workers { - let queue = crate::net::queue(15)?; - session_sends.push(queue.0.clone()); - worker_sends.push(queue); - } - - let sessions = SessionPool::new(config.clone(), session_sends, buffer_pool.clone()); + let backend = if self.xdp.force_xdp { + Backend::NetworkInterface + } else if self.udp_completion_enabled { + Backend::Completion + } else if self.udp_poll_enabled { + Backend::Polling + } else { + tracing::debug!("querying network capabilities"); + Backend::query(&self.xdp) + }; - crate::components::proxy::packet_router::spawn_receivers( + let (fut, finaliser, sessions) = crate::net::io::listen( config, - socket, - worker_sends, - &sessions, - buffer_pool, + self.udp_enabled.then_some(self.udp_port), + self.qcmp_enabled.then_some(self.qcmp_port), + self.udp_workers.get(), + self.xdp.clone(), + shutdown_rx, + backend, )?; - Ok(( - std::future::pending(), - Box::from(move |_shutdown_rx: &crate::signal::ShutdownRx| {}), - sessions, - )) - } - - #[cfg(target_os = "linux")] - fn spawn_xdp(&self, config: Arc, force_xdp: bool) -> eyre::Result> { - use crate::net::xdp; - use eyre::{Context as _, ContextCompat as _}; - - // TODO: remove this once it's been more stabilized - if !force_xdp { - return Ok(None); - } - - let filters = config - .dyn_cfg - .filters() - .context("XDP requires a filter chain")? - .clone(); - let clusters = config - .dyn_cfg - .clusters() - .context("XDP requires a cluster map")? - .clone(); - - let config = crate::net::xdp::process::ConfigState { filters, clusters }; - - let udp_port = if self.udp_enabled { self.udp_port } else { 0 }; - let qcmp_port = if self.qcmp_enabled { self.qcmp_port } else { 0 }; - - tracing::info!(udp_port, qcmp_port, "setting up xdp module"); - let workers = xdp::setup_xdp_io(xdp::XdpConfig { - nic: self - .xdp - .network_interface - .as_deref() - .map_or(xdp::NicConfig::Default, xdp::NicConfig::Name), - external_port: udp_port, - qcmp_port, - maximum_packet_memory: self.xdp.maximum_memory, - require_zero_copy: self.xdp.force_zerocopy, - require_tx_checksum: self.xdp.force_tx_checksum_offload, - }) - .context("failed to setup XDP")?; - - let io_loop = xdp::spawn(workers, config).context("failed to spawn XDP I/O loop")?; - Ok(Some(Box::new(move |srx: &crate::signal::ShutdownRx| { - io_loop.shutdown(*srx.borrow() == crate::signal::ShutdownKind::Normal); - }))) + Ok((either::Right(fut), finaliser, sessions)) } } diff --git a/src/codec/qcmp.rs b/src/codec/qcmp.rs index 502e1d600f..6b6aa76648 100644 --- a/src/codec/qcmp.rs +++ b/src/codec/qcmp.rs @@ -19,7 +19,7 @@ use crate::{ metrics, net::{ - DualStackEpollSocket, + Socket, phoenix::{DistanceMeasure, Measurement}, }, time::{DurationNanos, UtcTimestamp}, @@ -133,7 +133,7 @@ impl<'buf> PacketParser<'buf> { /// between nodes. #[derive(Debug, Clone)] pub struct QcmpMeasurement { - socket: Arc, + socket: Arc, #[cfg(test)] delay: Option, } @@ -141,7 +141,7 @@ pub struct QcmpMeasurement { impl QcmpMeasurement { pub fn new() -> crate::Result { Ok(Self { - socket: Arc::new(DualStackEpollSocket::new(0)?), + socket: Arc::new(Socket::polling_listen()?), #[cfg(test)] delay: None, }) @@ -150,7 +150,7 @@ impl QcmpMeasurement { #[cfg(test)] pub fn with_artificial_delay(delay: Duration) -> crate::Result { Ok(Self { - socket: Arc::new(DualStackEpollSocket::new(0)?), + socket: Arc::new(Socket::polling_listen()?), delay: Some(delay), }) } @@ -173,7 +173,7 @@ impl Measurement for QcmpMeasurement { let (size, _) = tokio::time::timeout( std::time::Duration::from_secs(5), - self.socket.recv_from(&mut recv), + self.socket.recv_from(&mut recv[..]), ) .await??; @@ -194,23 +194,19 @@ impl Measurement for QcmpMeasurement { } pub fn spawn( - socket: socket2::Socket, + socket: crate::net::Socket, mut shutdown_rx: crate::signal::ShutdownRx, ) -> crate::Result<()> { use tracing::{Instrument as _, instrument::WithSubscriber as _}; - - let port = crate::net::socket_port(&socket); - tokio::task::spawn( async move { let mut input_buf = [0u8; MAX_QCMP_PACKET_LEN]; - let socket = DualStackEpollSocket::new(port).unwrap(); let mut output_buf = QcmpPacket::default(); metrics::qcmp::active(true); loop { let result = tokio::select! { - result = socket.recv_from(&mut input_buf) => result, + result = socket.recv_from(&mut input_buf[..]) => result, _ = shutdown_rx.changed() => { metrics::qcmp::active(false); return @@ -255,7 +251,7 @@ pub fn spawn( "sending QCMP pong", ); - match track_error(socket.send_to(&output_buf, source).await, &asn_info) { + match track_error(socket.send_to(&*output_buf, source).await, &asn_info) { Ok(len) => { if len != output_buf.len() { tracing::error!(%source, "failed to send entire QCMP pong response, expected {} but only sent {len}", output_buf.len()); @@ -562,8 +558,6 @@ pub enum Error { #[cfg(test)] mod tests { - use crate::net::raw_socket_with_reuse; - use super::*; #[test] @@ -697,8 +691,8 @@ mod tests { #[tokio::test] #[cfg_attr(target_os = "macos", ignore)] async fn qcmp_measurement() { - let socket = raw_socket_with_reuse(0).unwrap(); - let addr = socket.local_addr().unwrap().as_socket().unwrap(); + let socket = Socket::polling_listen().unwrap(); + let addr = socket.local_addr(); let (_tx, rx) = crate::signal::channel(Default::default()); spawn(socket, rx).unwrap(); diff --git a/src/components.rs b/src/components.rs index 5609172c1d..b96f56a3e4 100644 --- a/src/components.rs +++ b/src/components.rs @@ -17,7 +17,6 @@ pub mod admin; pub mod agent; pub mod manage; -pub mod proxy; pub mod relay; /// Args common across all components diff --git a/src/components/proxy.rs b/src/components/proxy.rs deleted file mode 100644 index 47cd8041a9..0000000000 --- a/src/components/proxy.rs +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Copyright 2024 Google LLC All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -pub(crate) mod error; -pub mod packet_router; -pub(crate) mod sessions; - -use super::RunArgs; -pub use error::PipelineError; -pub use sessions::SessionPool; -use std::{ - net::SocketAddr, - sync::{ - Arc, - atomic::{AtomicBool, Ordering}, - }, -}; - -#[derive(Clone, Debug)] -pub struct Ready { - pub idle_request_interval: std::time::Duration, - // RwLock as this check is conditional on the proxy using xDS. - pub xds_is_healthy: Arc>>>, -} - -impl Default for Ready { - fn default() -> Self { - Self { - idle_request_interval: crate::components::admin::IDLE_REQUEST_INTERVAL, - xds_is_healthy: Default::default(), - } - } -} - -impl Ready { - #[inline] - pub fn is_ready(&self) -> Option { - self.xds_is_healthy - .read() - .as_ref() - .map(|health| health.load(Ordering::SeqCst)) - } -} - -pub struct ToTokens { - /// The number of tokens to assign to each `to` address - pub count: usize, - /// The size of each token - pub length: usize, -} - -pub struct Proxy { - pub num_workers: std::num::NonZeroUsize, - pub mmdb: Option, - pub management_servers: Vec, - pub to: Vec, - pub to_tokens: Option, - pub socket: Option, - pub qcmp: socket2::Socket, - pub phoenix: crate::net::TcpListener, - pub notifier: Option>, - pub xdp: crate::cli::proxy::XdpOptions, - pub termination_timeout: Option, -} - -impl Default for Proxy { - fn default() -> Self { - let qcmp = crate::net::raw_socket_with_reuse(0).unwrap(); - let phoenix = crate::net::TcpListener::bind(Some(crate::net::socket_port(&qcmp))).unwrap(); - - Self { - num_workers: std::num::NonZeroUsize::new(1).unwrap(), - mmdb: None, - management_servers: Vec::new(), - to: Vec::new(), - to_tokens: None, - socket: Some(crate::net::raw_socket_with_reuse(0).unwrap()), - qcmp, - phoenix, - notifier: None, - xdp: Default::default(), - termination_timeout: None, - } - } -} - -impl Proxy { - pub async fn run( - mut self, - RunArgs { - config, - ready, - mut shutdown_rx, - }: RunArgs, - initialized: Option>, - ) -> crate::Result<()> { - let _mmdb_task = self.mmdb.as_ref().map(|source| { - let source = source.clone(); - tokio::spawn(async move { - while let Err(error) = - tryhard::retry_fn(|| crate::MaxmindDb::update(source.clone())) - .retries(10) - .exponential_backoff(crate::config::BACKOFF_INITIAL_DELAY) - .await - { - tracing::warn!(%error, "error updating maxmind database"); - } - }) - }); - - let Some(clusters) = config.dyn_cfg.clusters() else { - eyre::bail!("empty clusters were not created") - }; - - if !self.to.is_empty() { - let endpoints = match &self.to_tokens { - Some(tt) => { - let (unique, overflow) = 256u64.overflowing_pow(tt.length as _); - if overflow { - panic!( - "can't generate {} tokens of length {} maximum is {}", - self.to.len() * tt.count, - tt.length, - u64::MAX, - ); - } - - if unique < (self.to.len() * tt.count) as u64 { - panic!( - "we require {} unique tokens but only {unique} can be generated", - self.to.len() * tt.count, - ); - } - - { - use crate::filters::StaticFilter as _; - let Some(filters) = config.dyn_cfg.filters() else { - eyre::bail!("empty filters were not created") - }; - - filters.store(Arc::new( - crate::filters::FilterChain::try_create([ - crate::filters::Capture::as_filter_config( - crate::filters::capture::Config { - metadata_key: crate::filters::capture::CAPTURED_BYTES - .into(), - strategy: crate::filters::capture::Strategy::Suffix( - crate::filters::capture::Suffix { - size: tt.length as _, - remove: true, - }, - ), - }, - ) - .unwrap(), - crate::filters::TokenRouter::as_filter_config(None).unwrap(), - ]) - .unwrap(), - )); - } - - let count = tt.count as u64; - - self.to - .iter() - .enumerate() - .map(|(ind, sa)| { - let mut tokens = std::collections::BTreeSet::new(); - let start = ind as u64 * count; - for i in start..(start + count) { - tokens.insert(i.to_le_bytes()[..tt.length].to_vec()); - } - - crate::net::endpoint::Endpoint::with_metadata( - (*sa).into(), - crate::net::endpoint::Metadata { tokens }, - ) - }) - .collect() - } - _ => self - .to - .iter() - .cloned() - .map(crate::net::endpoint::Endpoint::from) - .collect(), - }; - - clusters.modify(|clusters| { - clusters.insert(None, None, endpoints); - }); - } - - if !clusters.read().has_endpoints() && self.management_servers.is_empty() { - return Err(eyre::eyre!( - "`quilkin proxy` requires at least one `to` address or `management_server` endpoint." - )); - } - - #[allow(clippy::type_complexity)] - const SUBS: &[(&str, &[(&str, Vec)])] = &[ - ( - "9", - &[ - (crate::xds::CLUSTER_TYPE, Vec::new()), - (crate::xds::DATACENTER_TYPE, Vec::new()), - (crate::xds::FILTER_CHAIN_TYPE, Vec::new()), - ], - ), - ( - "", - &[ - (crate::xds::CLUSTER_TYPE, Vec::new()), - (crate::xds::DATACENTER_TYPE, Vec::new()), - (crate::xds::LISTENER_TYPE, Vec::new()), - ], - ), - ]; - - if !self.management_servers.is_empty() { - { - let mut lock = ready.xds_is_healthy.write(); - let check: Arc = <_>::default(); - *lock = Some(check.clone()); - } - - let id = config.id(); - - std::thread::Builder::new() - .name("proxy-subscription".into()) - .spawn({ - let config = config.clone(); - let mut shutdown_rx = shutdown_rx.clone(); - let management_servers = self.management_servers.clone(); - let tx = self.notifier.clone(); - - move || { - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .thread_name_fn(|| { - static ATOMIC_ID: std::sync::atomic::AtomicUsize = - std::sync::atomic::AtomicUsize::new(0); - let id = - ATOMIC_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - format!("proxy-subscription-{id}") - }) - .build() - .unwrap(); - - runtime.block_on(async move { - let client = crate::net::xds::AdsClient::connect( - String::clone(&id), - management_servers, - ) - .await?; - - let xds_is_healthy = - ready.xds_is_healthy.read().as_ref().unwrap().clone(); - - let _stream = client - .delta_subscribe(config.clone(), xds_is_healthy.clone(), tx, SUBS) - .await - .map_err(|_err| eyre::eyre!("failed to acquire delta stream"))?; - - let _ = shutdown_rx.changed().await; - Ok::<_, eyre::Error>(()) - }) - } - }) - .expect("failed to spawn proxy-subscription thread"); - } - - // TODO: Remove this once the CLI is fully moved over. - let udp_port = crate::net::socket_port(&self.socket.take().unwrap()); - let qcmp_port = crate::net::socket_port(&std::mem::replace( - &mut self.qcmp, - crate::net::raw_socket_with_reuse(0).unwrap(), - )); - let phoenix_port = std::mem::replace( - &mut self.phoenix, - crate::net::TcpListener::bind(None).unwrap(), - ) - .port(); - - let svc_task = crate::cli::Service::default() - .udp() - .udp_port(udp_port) - .xdp(self.xdp) - .qcmp() - .qcmp_port(qcmp_port) - .phoenix() - .phoenix_port(phoenix_port) - .termination_timeout(self.termination_timeout) - .spawn_services(&config, &shutdown_rx)?; - - tracing::info!("Quilkin is ready"); - if let Some(initialized) = initialized { - let _ = initialized.send(()); - } - - shutdown_rx - .changed() - .await - .map_err(|error| eyre::eyre!(error))?; - - if let Ok(Err(error)) = svc_task.await { - tracing::error!(%error, "Quilkin proxy services exited with error"); - } - - Ok(()) - } -} diff --git a/src/components/proxy/packet_router.rs b/src/components/proxy/packet_router.rs deleted file mode 100644 index 1debb33347..0000000000 --- a/src/components/proxy/packet_router.rs +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Copyright 2024 Google LLC All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use super::{ - PipelineError, SessionPool, - sessions::{SessionKey, SessionManager}, -}; -use crate::{ - Config, - filters::{Filter as _, ReadContext}, - metrics, -}; -use std::{net::SocketAddr, sync::Arc}; - -#[cfg(target_os = "linux")] -mod io_uring; -#[cfg(not(target_os = "linux"))] -mod reference; - -/// Representation of an immutable set of bytes pulled from the network, this trait -/// provides an abstraction over however the packet was received (epoll, io-uring, xdp) -/// -/// Use [`PacketMut`] if you need a mutable representation. -pub trait Packet: Sized { - /// Returns the underlying slice of bytes representing the packet. - fn as_slice(&self) -> &[u8]; - - /// Returns the size of the packet. - fn len(&self) -> usize; - - /// Returns whether the given packet is empty. - fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -/// Representation of an mutable set of bytes pulled from the network, this trait -/// provides an abstraction over however the packet was received (epoll, io-uring, xdp) -pub trait PacketMut: Sized + Packet { - type FrozenPacket: Packet; - fn remove_head(&mut self, length: usize); - fn remove_tail(&mut self, length: usize); - fn extend_head(&mut self, bytes: &[u8]); - fn extend_tail(&mut self, bytes: &[u8]); - /// Returns an immutable version of the packet, this allows certain types - /// return a type that can be more cheaply cloned and shared. - fn freeze(self) -> Self::FrozenPacket; -} - -/// Packet received from local port -pub(crate) struct DownstreamPacket

{ - pub(crate) contents: P, - pub(crate) source: SocketAddr, -} - -impl DownstreamPacket

{ - #[inline] - pub(crate) fn process>( - self, - worker_id: usize, - config: &Arc, - sessions: &S, - destinations: &mut Vec, - ) { - tracing::trace!( - id = worker_id, - size = self.contents.len(), - source = %self.source, - "received packet from downstream" - ); - - let timer = metrics::processing_time(metrics::READ).start_timer(); - if let Err(error) = self.process_inner(config, sessions, destinations) { - let discriminant = error.discriminant(); - - error.inc_system_errors_total(metrics::READ, &metrics::EMPTY); - metrics::packets_dropped_total(metrics::READ, discriminant, &metrics::EMPTY).inc(); - } - - timer.stop_and_record(); - } - - /// Processes a packet by running it through the filter chain. - #[inline] - fn process_inner>( - self, - config: &Arc, - sessions: &S, - destinations: &mut Vec, - ) -> Result<(), PipelineError> { - let Some(clusters) = config - .dyn_cfg - .clusters() - .filter(|c| c.read().has_endpoints()) - else { - tracing::trace!("no upstream endpoints"); - return Err(PipelineError::NoUpstreamEndpoints); - }; - - let cm = clusters.clone_value(); - let Some(filters) = config.dyn_cfg.filters() else { - return Err(PipelineError::Filter(crate::filters::FilterError::Custom( - "no filters loaded", - ))); - }; - - #[cfg(not(debug_assertions))] - { - match self.source.ip() { - std::net::IpAddr::V4(ipv4) => { - if ipv4.is_loopback() || ipv4.is_multicast() || ipv4.is_broadcast() { - return Err(PipelineError::DisallowedSourceIP(self.source.ip())); - } - } - std::net::IpAddr::V6(ipv6) => { - if ipv6.is_loopback() || ipv6.is_multicast() { - return Err(PipelineError::DisallowedSourceIP(self.source.ip())); - } - } - } - } - - let mut context = ReadContext::new(&cm, self.source.into(), self.contents, destinations); - filters.read(&mut context).map_err(PipelineError::Filter)?; - - let ReadContext { contents, .. } = context; - - // Similar to bytes::BytesMut::freeze, we turn the mutable pool buffer - // into an immutable one with its own internal arc so it can be cloned - // cheaply and returned to the pool once all references are dropped - let contents = contents.freeze(); - - for epa in destinations.drain(0..) { - let session_key = SessionKey { - source: self.source, - dest: epa.to_socket_addr()?, - }; - - sessions.send(session_key, &contents)?; - } - - Ok(()) - } -} - -/// Represents the required arguments to run a worker task that -/// processes packets received downstream. -pub struct DownstreamReceiveWorkerConfig { - /// ID of the worker. - pub worker_id: usize, - pub port: u16, - pub config: Arc, - pub sessions: Arc, - pub buffer_pool: Arc, -} - -/// Spawns a background task that sits in a loop, receiving packets from the passed in socket. -/// Each received packet is placed on a queue to be processed by a worker task. -/// This function also spawns the set of worker tasks responsible for consuming packets -/// off the aforementioned queue and processing them through the filter chain and session -/// pipeline. -pub fn spawn_receivers( - config: Arc, - socket: socket2::Socket, - worker_sends: Vec, - sessions: &Arc, - buffer_pool: Arc, -) -> crate::Result<()> { - let port = crate::net::socket_port(&socket); - - for (worker_id, ws) in worker_sends.into_iter().enumerate() { - let worker = DownstreamReceiveWorkerConfig { - worker_id, - port, - config: config.clone(), - sessions: sessions.clone(), - buffer_pool: buffer_pool.clone(), - }; - - worker.spawn(ws)?; - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - #![cfg(not(debug_assertions))] - - use quilkin_xds::locality::Locality; - - use crate::collections::BufferPool; - use crate::net::Endpoint; - use crate::test::alloc_buffer; - - use super::*; - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; - use std::net::{SocketAddrV4, SocketAddrV6}; - - // Ensure we disallow certain source IP addresses to protect against UDP amplification attacks - #[tokio::test] - async fn disallowed_ips() { - let nl1 = Locality::with_region("nl-1"); - let endpoint = Endpoint::new((Ipv4Addr::LOCALHOST, 7777).into()); - - let config = Arc::new(Config::default_agent().cluster( - None, - Some(nl1.clone()), - [endpoint.clone()].into(), - )); - let buffer_pool = Arc::new(BufferPool::new(1, 10)); - let session_manager = SessionPool::new(config.clone(), vec![], buffer_pool.clone()); - - let packet_data: [u8; 4] = [1, 2, 3, 4]; - for ip in [ - IpAddr::V4(Ipv4Addr::LOCALHOST), - IpAddr::V4(Ipv4Addr::BROADCAST), - // multicast = 224.0.0.0/4 - IpAddr::V4(Ipv4Addr::new(224, 0, 0, 0)), - IpAddr::V4(Ipv4Addr::new(239, 255, 255, 255)), - IpAddr::V6(Ipv6Addr::LOCALHOST), - // multicast = any address starting with 0xff - IpAddr::V6(Ipv6Addr::new(0xff00, 0, 0, 0, 0, 0, 0, 0)), - ] { - let packet = DownstreamPacket { - contents: alloc_buffer(packet_data), - source: match ip { - IpAddr::V4(ipv4) => SocketAddr::V4(SocketAddrV4::new(ipv4, 0)), - IpAddr::V6(ipv6) => SocketAddr::V6(SocketAddrV6::new(ipv6, 0, 0, 0)), - }, - }; - - let mut endpoints = vec![endpoint.address.clone()]; - let res = packet.process_inner(&config, &session_manager, &mut endpoints); - - assert_eq!(res, Err(PipelineError::DisallowedSourceIP(ip))); - } - } -} diff --git a/src/components/proxy/packet_router/io_uring.rs b/src/components/proxy/packet_router/io_uring.rs deleted file mode 100644 index f2d2f0361e..0000000000 --- a/src/components/proxy/packet_router/io_uring.rs +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2024 Google LLC All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use eyre::Context as _; - -impl super::DownstreamReceiveWorkerConfig { - pub fn spawn(self, pending_sends: crate::net::PacketQueue) -> eyre::Result<()> { - use crate::net::io_uring; - - let Self { - worker_id, - port, - config, - sessions, - buffer_pool, - } = self; - - let socket = - crate::net::DualStackLocalSocket::new(port).context("failed to bind socket")?; - - let io_loop = io_uring::IoUringLoop::new(2000, socket)?; - io_loop - .spawn( - format!("packet-router-{worker_id}"), - io_uring::PacketProcessorCtx::Router { - config, - sessions, - worker_id, - destinations: Vec::with_capacity(1), - }, - pending_sends, - buffer_pool, - ) - .context("failed to spawn io-uring loop") - } -} diff --git a/src/components/proxy/packet_router/reference.rs b/src/components/proxy/packet_router/reference.rs deleted file mode 100644 index 7ff1664b74..0000000000 --- a/src/components/proxy/packet_router/reference.rs +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright 2024 Google LLC All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -//! The reference implementation is used for non-Linux targets - -impl super::DownstreamReceiveWorkerConfig { - pub fn spawn(self, packet_queue: crate::net::PacketQueue) -> eyre::Result<()> { - let Self { - worker_id, - port, - config, - sessions, - buffer_pool, - } = self; - - let thread_span = - uring_span!(tracing::debug_span!("receiver", id = worker_id).or_current()); - let (tx, mut rx) = tokio::sync::oneshot::channel(); - - let worker = uring_spawn!(thread_span, async move { - crate::metrics::game_traffic_tasks().inc(); - let mut last_received_at = None; - let socket = crate::net::DualStackLocalSocket::new(port) - .unwrap() - .make_refcnt(); - - tracing::trace!(port, "bound worker"); - let send_socket = socket.clone(); - - let inner_task = async move { - let (packet_queue, mut sends_rx) = packet_queue; - let mut sends_double_buffer = Vec::with_capacity(packet_queue.capacity()); - - while sends_rx.changed().await.is_ok() { - if !*sends_rx.borrow() { - tracing::trace!("io loop shutdown requested"); - break; - } - - sends_double_buffer = packet_queue.swap(sends_double_buffer); - - for packet in sends_double_buffer.drain(..sends_double_buffer.len()) { - let (result, _) = send_socket - .send_to(packet.data, packet.destination.as_socket().unwrap()) - .await; - let asn_info = packet.asn_info.as_ref().into(); - match result { - Ok(size) => { - crate::metrics::packets_total(crate::metrics::WRITE, &asn_info) - .inc(); - crate::metrics::bytes_total(crate::metrics::WRITE, &asn_info) - .inc_by(size as u64); - } - Err(error) => { - let source = error.to_string(); - crate::metrics::errors_total( - crate::metrics::WRITE, - &source, - &asn_info, - ) - .inc(); - crate::metrics::packets_dropped_total( - crate::metrics::WRITE, - &source, - &asn_info, - ) - .inc(); - } - } - } - } - - let _ = tx.send(()); - }; - - cfg_if::cfg_if! { - if #[cfg(debug_assertions)] { - uring_inner_spawn!(inner_task.instrument(tracing::debug_span!("upstream").or_current())); - } else { - uring_inner_spawn!(inner_task); - } - } - - let mut destinations = Vec::with_capacity(1); - - loop { - // Initialize a buffer for the UDP packet. We use the maximum size of a UDP - // packet, which is the maximum value of 16 a bit integer. - let buffer = buffer_pool.clone().alloc(); - - tokio::select! { - received = socket.recv_from(buffer) => { - let received_at = crate::time::UtcTimestamp::now(); - let (result, buffer) = received; - - match result { - Ok((_size, mut source)) => { - source.set_ip(source.ip().to_canonical()); - let packet = super::DownstreamPacket { contents: buffer, source }; - - if let Some(last_received_at) = last_received_at { - crate::metrics::packet_jitter( - crate::metrics::READ, - &crate::metrics::EMPTY, - ) - .set((received_at - last_received_at).nanos()); - } - last_received_at = Some(received_at); - - packet.process( - worker_id, - &config, - &sessions, - &mut destinations, - ); - } - Err(error) => { - tracing::error!(%error, "error receiving packet"); - return; - } - } - } - _ = &mut rx => { - crate::metrics::game_traffic_task_closed().inc(); - tracing::debug!("Closing downstream socket loop, shutdown requested"); - return; - } - } - } - }); - - use eyre::WrapErr as _; - worker.recv().context("failed to spawn receiver task")?; - Ok(()) - } -} diff --git a/src/config/providersv2.rs b/src/config/providersv2.rs index baa8e01f25..d9f4cdec0e 100644 --- a/src/config/providersv2.rs +++ b/src/config/providersv2.rs @@ -140,7 +140,7 @@ pub struct Providers { env = "QUILKIN_PROVIDERS_STATIC_ENDPOINTS" )] endpoints: Vec, - /// Assigns dynamic tokens to each address in the `--to` argument + /// Assigns dynamic tokens to each address in the `provider.static.endpoints` argument /// /// Format is `:` #[clap( @@ -228,6 +228,11 @@ impl Providers { self } + pub fn xds_endpoints(mut self, ns: Vec) -> Self { + self.xds_endpoints = ns; + self + } + fn static_enabled(&self) -> bool { !self.endpoints.is_empty() } @@ -242,31 +247,31 @@ impl Providers { .as_ref() .map(|tt| { let Some((count, length)) = tt.split_once(':') else { - eyre::bail!("--to-tokens `{tt}` is invalid, it must have a `:` separator") + eyre::bail!("provider.static.endpoint_tokens: `{tt}` is invalid, it must have a `:` separator") }; - let count = count.parse()?; - let length = length.parse()?; + let count: usize = count.parse()?; + let length: usize = length.parse()?; - Ok(crate::components::proxy::ToTokens { count, length }) + Ok((count, length)) }) .transpose()?; - let endpoints = if let Some(tt) = endpoint_tokens { - let (unique, overflow) = 256u64.overflowing_pow(tt.length as _); + let endpoints = if let Some((count, length)) = endpoint_tokens { + let (unique, overflow) = 256u64.overflowing_pow(length as _); if overflow { panic!( "can't generate {} tokens of length {} maximum is {}", - self.endpoints.len() * tt.count, - tt.length, + self.endpoints.len() * count, + length, u64::MAX, ); } - if unique < (self.endpoints.len() * tt.count) as u64 { + if unique < (self.endpoints.len() * count) as u64 { panic!( "we require {} unique tokens but only {unique} can be generated", - self.endpoints.len() * tt.count, + self.endpoints.len() * count, ); } @@ -279,7 +284,7 @@ impl Providers { metadata_key: crate::filters::capture::CAPTURED_BYTES.into(), strategy: crate::filters::capture::Strategy::Suffix( crate::filters::capture::Suffix { - size: tt.length as _, + size: length as _, remove: true, }, ), @@ -292,7 +297,7 @@ impl Providers { )); } - let count = tt.count as u64; + let count = count as u64; self.endpoints .iter() @@ -301,7 +306,7 @@ impl Providers { let mut tokens = std::collections::BTreeSet::new(); let start = ind as u64 * count; for i in start..(start + count) { - tokens.insert(i.to_le_bytes()[..tt.length].to_vec()); + tokens.insert(i.to_le_bytes()[..length].to_vec()); } crate::net::endpoint::Endpoint::with_metadata( @@ -332,6 +337,24 @@ impl Providers { Ok(tokio::spawn(std::future::pending())) } + pub fn spawn_mmdb_provider(&self) -> JoinHandle> { + let Some(source) = self.mmdb.clone() else { + return tokio::spawn(std::future::pending()); + }; + + tokio::spawn(async move { + while let Err(error) = tryhard::retry_fn(|| crate::MaxmindDb::update(source.clone())) + .retries(10) + .exponential_backoff(crate::config::BACKOFF_INITIAL_DELAY) + .await + { + tracing::warn!(%error, "error updating maxmind database"); + } + + Ok(()) + }) + } + pub fn spawn_k8s_provider( &self, health_check: Arc, diff --git a/src/filters.rs b/src/filters.rs index 489b25b3db..64c1ad35d4 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -50,6 +50,7 @@ pub mod prelude { #[doc(inline)] pub use self::{ capture::Capture, + chain::FilterChain, concatenate::Concatenate, debug::Debug, drop::Drop, @@ -68,10 +69,10 @@ pub use self::{ write::WriteContext, }; -pub use crate::test::TestFilter; - -pub use self::chain::FilterChain; -pub use crate::components::proxy::packet_router::{Packet, PacketMut}; +pub use crate::{ + net::packet::{PacketData, PacketData as Packet, PacketDataMut, PacketDataMut as PacketMut}, + test::TestFilter, +}; #[enum_dispatch::enum_dispatch(Filter)] pub enum FilterKind { diff --git a/src/lib.rs b/src/lib.rs index 74fecba7b2..dd3d999585 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,10 +41,7 @@ pub use quilkin_proto as generated; pub type Result = std::result::Result; #[doc(inline)] -pub use self::{ - cli::{Cli, Proxy}, - config::Config, -}; +pub use self::{cli::Cli, config::Config}; pub use quilkin_macros::include_proto; diff --git a/src/net.rs b/src/net.rs index 02be5ebe64..132aaf2fcb 100644 --- a/src/net.rs +++ b/src/net.rs @@ -14,10 +14,7 @@ * limitations under the License. */ -pub mod packet; - /// On linux spawns a io-uring runtime + thread, everywhere else spawns a regular tokio task. -#[cfg(not(target_os = "linux"))] macro_rules! uring_spawn { ($span:expr_2021, $future:expr_2021) => {{ let (tx, rx) = std::sync::mpsc::channel::<()>(); @@ -39,14 +36,6 @@ macro_rules! uring_spawn { }}; } -/// On linux spawns a io-uring task, everywhere else spawns a regular tokio task. -#[cfg(not(target_os = "linux"))] -macro_rules! uring_inner_spawn { - ($future:expr_2021) => { - tokio::spawn($future); - }; -} - /// Allows creation of spans only when `debug_assertions` are enabled, to avoid /// hitting the cap of 4096 threads that is unconfigurable in /// `tracing_subscriber` -> `sharded_slab` for span ids @@ -64,248 +53,24 @@ macro_rules! uring_span { pub mod cluster; pub mod endpoint; -pub(crate) mod maxmind_db; +pub mod io; +pub mod packet; pub mod phoenix; +pub(crate) mod error; +pub(crate) mod maxmind_db; +pub mod sessions; + pub use quilkin_xds as xds; pub use xds::net::TcpListener; -use std::{ - io, - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, -}; - -use socket2::{Protocol, Socket, Type}; - -cfg_if::cfg_if! { - if #[cfg(target_os = "linux")] { - use std::net::UdpSocket; - - pub(crate) mod io_uring; - pub mod xdp; - } else { - use tokio::net::UdpSocket; - } -} - pub use self::{ cluster::ClusterMap, endpoint::{Endpoint, EndpointAddress}, + error::PipelineError, + io::{Socket, Socket as DualStackLocalSocket, Socket as DualStackEpollSocket, SystemSocket}, }; -pub use self::packet::{PacketQueue, PacketQueueSender, queue}; - -fn socket_with_reuse_and_address(addr: SocketAddr) -> std::io::Result { - cfg_if::cfg_if! { - if #[cfg(target_os = "linux")] { - raw_socket_with_reuse_and_address(addr) - .map(From::from) - } else { - epoll_socket_with_reuse_and_address(addr) - } - } -} - -fn epoll_socket_with_reuse(port: u16) -> std::io::Result { - raw_socket_with_reuse_and_address((Ipv6Addr::UNSPECIFIED, port).into()) - .map(From::from) - .and_then(tokio::net::UdpSocket::from_std) -} - -fn epoll_socket_with_reuse_and_address(addr: SocketAddr) -> std::io::Result { - raw_socket_with_reuse_and_address(addr) - .map(From::from) - .and_then(tokio::net::UdpSocket::from_std) -} - -#[inline] -pub fn raw_socket_with_reuse(port: u16) -> std::io::Result { - raw_socket_with_reuse_and_address((Ipv6Addr::UNSPECIFIED, port).into()) -} - -pub fn raw_socket_with_reuse_and_address(addr: SocketAddr) -> std::io::Result { - let domain = match addr { - SocketAddr::V4(_) => socket2::Domain::IPV4, - SocketAddr::V6(_) => socket2::Domain::IPV6, - }; - - let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; - enable_reuse(&sock)?; - sock.set_nonblocking(true)?; - if domain == socket2::Domain::IPV6 { - // be explicit so we can have dual stack sockets. - sock.set_only_v6(false)?; - } - sock.bind(&addr.into())?; - - Ok(sock) -} - -#[inline] -pub fn socket_port(socket: &socket2::Socket) -> u16 { - match socket.local_addr().unwrap().as_socket().unwrap() { - SocketAddr::V4(addr) => addr.port(), - SocketAddr::V6(addr) => addr.port(), - } -} - -#[cfg(not(target_family = "windows"))] -fn enable_reuse(sock: &Socket) -> io::Result<()> { - sock.set_reuse_port(true)?; - Ok(()) -} - -#[cfg(target_family = "windows")] -fn enable_reuse(sock: &Socket) -> io::Result<()> { - sock.set_reuse_address(true)?; - Ok(()) -} - -/// An ipv6 socket that can accept and send data from either a local ipv4 address or ipv6 address -/// with port reuse enabled and `only_v6` set to false. -pub struct DualStackLocalSocket { - socket: UdpSocket, - local_addr: SocketAddr, -} - -impl DualStackLocalSocket { - pub fn from_raw(socket: Socket) -> Self { - let socket: std::net::UdpSocket = socket.into(); - let local_addr = socket.local_addr().unwrap(); - cfg_if::cfg_if! { - if #[cfg(target_os = "linux")] { - let socket = socket; - } else { - // This is only for macOS and Windows (non-production platforms), - // and should never happen anyway, so unwrap here is fine. - let socket = UdpSocket::from_std(socket).unwrap(); - } - } - Self { socket, local_addr } - } - - pub fn new(port: u16) -> std::io::Result { - raw_socket_with_reuse(port).map(Self::from_raw) - } - - pub fn bind_local(port: u16) -> std::io::Result { - let local_addr = (Ipv6Addr::LOCALHOST, port).into(); - let socket = socket_with_reuse_and_address(local_addr)?; - Ok(Self { socket, local_addr }) - } - - pub fn local_ipv4_addr(&self) -> io::Result { - Ok(match self.local_addr { - SocketAddr::V4(_) => self.local_addr, - SocketAddr::V6(_) => (Ipv4Addr::UNSPECIFIED, self.local_addr.port()).into(), - }) - } - - pub fn local_ipv6_addr(&self) -> io::Result { - Ok(match self.local_addr { - SocketAddr::V4(v4addr) => SocketAddr::new( - IpAddr::V6(v4addr.ip().to_ipv6_mapped()), - self.local_addr.port(), - ), - SocketAddr::V6(_) => self.local_addr, - }) - } - - cfg_if::cfg_if! { - if #[cfg(not(target_os = "linux"))] { - pub async fn recv_from>(&self, mut buf: B) -> (io::Result<(usize, SocketAddr)>, B) { - let result = self.socket.recv_from(&mut buf).await; - (result, buf) - } - - pub async fn send_to>(&self, buf: B, target: SocketAddr) -> (io::Result, B) { - let result = self.socket.send_to(&buf, target).await; - (result, buf) - } - } else { - #[inline] - pub fn raw_fd(&self) -> ::io_uring::types::Fd { - use std::os::fd::AsRawFd; - ::io_uring::types::Fd(self.socket.as_raw_fd()) - } - } - } - - pub fn make_refcnt(self) -> DualStackLocalSocketRc { - DualStackLocalSocketRc::new(self) - } -} - -cfg_if::cfg_if! { - if #[cfg(target_os = "linux")] { - pub type DualStackLocalSocketRc = std::rc::Rc; - } else { - pub type DualStackLocalSocketRc = std::sync::Arc; - } -} - -/// The same as [`DualStackSocket`] but uses epoll instead of uring. -#[derive(Debug)] -pub struct DualStackEpollSocket { - socket: tokio::net::UdpSocket, -} - -impl DualStackEpollSocket { - pub fn new(port: u16) -> std::io::Result { - Ok(Self { - socket: epoll_socket_with_reuse(port)?, - }) - } - - pub fn bind_local(port: u16) -> std::io::Result { - Ok(Self { - socket: epoll_socket_with_reuse_and_address((Ipv6Addr::LOCALHOST, port).into())?, - }) - } - - /// Primarily used for testing of ipv4 vs ipv6 addresses. - pub(crate) fn new_with_address(addr: SocketAddr) -> std::io::Result { - Ok(Self { - socket: epoll_socket_with_reuse_and_address(addr)?, - }) - } - - pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - self.socket.recv_from(buf).await - } - - pub fn local_addr(&self) -> io::Result { - self.socket.local_addr() - } - - pub fn local_ipv4_addr(&self) -> io::Result { - let addr = self.socket.local_addr()?; - match addr { - SocketAddr::V4(_) => Ok(addr), - SocketAddr::V6(_) => Ok((Ipv4Addr::UNSPECIFIED, addr.port()).into()), - } - } - - pub fn local_ipv6_addr(&self) -> io::Result { - let addr = self.socket.local_addr()?; - match addr { - SocketAddr::V4(v4addr) => Ok(SocketAddr::new( - IpAddr::V6(v4addr.ip().to_ipv6_mapped()), - addr.port(), - )), - SocketAddr::V6(_) => Ok(addr), - } - } - - pub async fn send_to( - &self, - buf: &[u8], - target: A, - ) -> io::Result { - self.socket.send_to(buf, target).await - } -} - #[cfg(test)] mod tests { use std::{ @@ -321,26 +86,26 @@ mod tests { #[tokio::test] async fn dual_stack_socket_reusable() { let expected = available_addr(AddressType::Random).await; - let socket = super::DualStackEpollSocket::new(expected.port()).unwrap(); - let addr = socket.local_ipv4_addr().unwrap(); + let socket = super::Socket::polling_from_port(expected.port()).unwrap(); + let addr = socket.local_ipv4_addr(); match expected { - SocketAddr::V4(_) => assert_eq!(expected, socket.local_ipv4_addr().unwrap()), - SocketAddr::V6(_) => assert_eq!(expected, socket.local_ipv6_addr().unwrap()), + SocketAddr::V4(_) => assert_eq!(expected, socket.local_ipv4_addr()), + SocketAddr::V6(_) => assert_eq!(expected, socket.local_ipv6_addr()), } - assert_eq!(expected.port(), socket.local_ipv4_addr().unwrap().port()); - assert_eq!(expected.port(), socket.local_ipv6_addr().unwrap().port()); + assert_eq!(expected.port(), socket.local_ipv4_addr().port()); + assert_eq!(expected.port(), socket.local_ipv6_addr().port()); // should be able to do it a second time, since we are reusing the address. - let socket = super::DualStackEpollSocket::new(expected.port()).unwrap(); + let socket = super::Socket::polling_from_port(expected.port()).unwrap(); match expected { - SocketAddr::V4(_) => assert_eq!(expected, socket.local_ipv4_addr().unwrap()), - SocketAddr::V6(_) => assert_eq!(expected, socket.local_ipv6_addr().unwrap()), + SocketAddr::V4(_) => assert_eq!(expected, socket.local_ipv4_addr()), + SocketAddr::V6(_) => assert_eq!(expected, socket.local_ipv6_addr()), } - assert_eq!(addr.port(), socket.local_ipv4_addr().unwrap().port()); - assert_eq!(addr.port(), socket.local_ipv6_addr().unwrap().port()); + assert_eq!(addr.port(), socket.local_ipv4_addr().port()); + assert_eq!(addr.port(), socket.local_ipv6_addr().port()); } #[tokio::test] @@ -355,7 +120,7 @@ mod tests { let msg = "hello"; let addr = echo_addr.to_socket_addr().unwrap(); - socket.send_to(msg.as_bytes(), &addr).await.unwrap(); + socket.send_to(msg.as_bytes(), addr).await.unwrap(); assert_eq!( msg, timeout(Duration::from_secs(5), rx.recv()) @@ -376,7 +141,7 @@ mod tests { AddressKind::Name(_) => unreachable!(), }; - socket.send_to(msg.as_bytes(), &opp_addr).await.unwrap(); + socket.send_to(msg.as_bytes(), opp_addr).await.unwrap(); assert_eq!( msg, timeout(Duration::from_secs(5), rx.recv()) @@ -389,7 +154,7 @@ mod tests { // stack socket. let (mut rx, socket) = t.open_ipv4_socket_and_recv_multiple_packets().await; socket - .send_to(msg.as_bytes(), &ipv4_echo_addr) + .send_to(msg.as_bytes(), ipv4_echo_addr) .await .unwrap(); assert_eq!( diff --git a/src/components/proxy/error.rs b/src/net/error.rs similarity index 100% rename from src/components/proxy/error.rs rename to src/net/error.rs diff --git a/src/net/io.rs b/src/net/io.rs new file mode 100644 index 0000000000..48b08f40ec --- /dev/null +++ b/src/net/io.rs @@ -0,0 +1,231 @@ +/* + * Copyright 2024 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::sync::Arc; + +use super::sessions::SessionPool; +use crate::Config; + +pub mod completion; +pub mod nic; +pub mod poll; +pub(crate) mod socket; + +pub use socket::{Socket, SystemSocket}; + +/// Spawns a background task that sits in a loop, receiving packets from the passed in socket. +/// Each received packet is placed on a queue to be processed by a worker task. +/// This function also spawns the set of worker tasks responsible for consuming packets +/// off the aforementioned queue and processing them through the filter chain and session +/// pipeline. +#[allow(clippy::type_complexity)] +pub fn listen( + config: &Arc, + udp_port: Option, + qcmp_port: Option, + workers: usize, + xdp: crate::cli::XdpOptions, + shutdown_rx: &crate::signal::ShutdownRx, + backend: Backend, +) -> crate::Result<( + impl Future> + use<>, + Option, + Option>, +)> { + if udp_port.is_none() && qcmp_port.is_none() { + panic!("bug: `net::io::listen` requires either `udp` or `qcmp` to be set"); + } + + tracing::info!(%backend, "network I/O interface chosen"); + match backend { + Backend::NetworkInterface => { + let finalizer = nic::listen( + config, + udp_port.unwrap_or_default(), + qcmp_port.unwrap_or_default(), + xdp, + )?; + Ok((either::Left(std::future::pending()), finalizer, None)) + } + backend => { + if let Some(socket) = qcmp_port + .map(|port| backend.socket_from_port(port)) + .transpose()? + { + crate::codec::qcmp::spawn(socket, shutdown_rx.clone())?; + } + + let Some(port) = udp_port else { + return Ok((either::Left(std::future::pending()), None, None)); + }; + + let buffer_pool = Arc::new(crate::collections::BufferPool::new(workers, 2 * 1024)); + + let mut worker_sends = Vec::with_capacity(workers); + let mut session_sends = Vec::with_capacity(workers); + for _ in 0..workers { + let queue = crate::net::packet::queue(15, backend)?; + session_sends.push(queue.0.clone()); + worker_sends.push(queue); + } + + let sessions = + SessionPool::new(config.clone(), session_sends, buffer_pool.clone(), backend); + + for (worker_id, ws) in worker_sends.into_iter().enumerate() { + let worker = Listener { + worker_id, + port, + config: config.clone(), + sessions: sessions.clone(), + buffer_pool: buffer_pool.clone(), + }; + + if backend == Backend::Completion { + completion::listen(worker, ws)?; + } else { + poll::listen(worker, ws)?; + } + } + + Ok(( + either::Right(std::future::pending()), + Some(Box::from( + move |_shutdown_rx: &crate::signal::ShutdownRx| {}, + )), + Some(sessions), + )) + } + } +} + +/// Represents the required arguments to run a worker task that +/// processes packets received downstream. +pub struct Listener { + /// ID of the worker. + pub worker_id: usize, + pub port: u16, + pub config: Arc, + pub sessions: Arc, + pub buffer_pool: Arc, +} + +/// The underlying I/O listener responsible for actually sending and receiving +/// packets. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Backend { + /// Polling based driven backend (e.g. `epoll`) + Polling, + /// Async "completion" based driven backend (e.g. `io-uring`) + /// + /// **Note:** Currently only supports Linux. + Completion, + /// Network Interface Controller driven backend (e.g. eBPF XDP) + /// + /// **Note:** Currently only supports Linux. + NetworkInterface, +} + +impl Backend { + pub(crate) fn query(xdp: &crate::cli::XdpOptions) -> Self { + nic::is_available(xdp) + .then_some(Backend::NetworkInterface) + .or_else(|| completion::is_available().then_some(Backend::Completion)) + .unwrap_or(Backend::Polling) + } + + pub(crate) fn queue(self) -> std::io::Result<(Notifier, Receiver)> { + match self { + Self::NetworkInterface => unreachable!("queues are unsupported for NIC backends"), + Self::Polling => Ok(poll::queue()), + Self::Completion => completion::queue(), + } + } + + pub(crate) fn socket(self, addr: std::net::SocketAddr) -> std::io::Result { + match self { + Self::NetworkInterface => unreachable!("sockets are unsupported for NIC backends"), + Self::Polling => super::Socket::polling_from_addr(addr), + Self::Completion => super::Socket::completion_from_addr(addr), + } + } + + pub(crate) fn socket_from_port(self, port: u16) -> std::io::Result { + self.socket((std::net::Ipv6Addr::UNSPECIFIED, port).into()) + } +} + +impl std::fmt::Display for Backend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::NetworkInterface => f.write_str(nic::NAME), + Self::Completion => f.write_str(completion::NAME), + Self::Polling => f.write_str(poll::NAME), + } + } +} + +#[derive(Clone)] +#[cfg_attr(not(target_os = "linux"), allow(dead_code))] +pub enum Notifier { + Completion(completion::Notifier), + Polling(poll::Notifier), +} + +impl Notifier { + pub fn notify(&self) { + match self { + Self::Completion(notify) => { + notify.notify(); + } + Self::Polling(notify) => { + let _ = notify.send(true); + } + } + } +} + +#[cfg_attr(not(target_os = "linux"), allow(dead_code))] +pub enum Receiver { + Polling(poll::Receiver), + Completion(completion::Receiver), +} + +impl Receiver { + #[cfg_attr(not(target_os = "linux"), allow(dead_code))] + pub fn as_completion_mut(&mut self) -> &mut completion::Receiver { + match self { + Self::Completion(notify) => notify, + Self::Polling(_) => unreachable!("bug: `as_completion` called on `polling` backend"), + } + } + + pub fn as_polling(&self) -> &poll::Receiver { + match self { + Self::Polling(notify) => notify, + Self::Completion(_) => unreachable!("bug: `as_polling` called on `completion` backend"), + } + } + + pub fn as_polling_mut(&mut self) -> &mut poll::Receiver { + match self { + Self::Polling(notify) => notify, + Self::Completion(_) => { + unreachable!("bug: `as_polling_mut` called on `completion` backend") + } + } + } +} diff --git a/src/net/io/completion.rs b/src/net/io/completion.rs new file mode 100644 index 0000000000..e3bbb2ef12 --- /dev/null +++ b/src/net/io/completion.rs @@ -0,0 +1,42 @@ +pub use std::net::UdpSocket as Socket; + +pub const NAME: &str = "io_uring"; + +#[track_caller] +pub fn from_system_socket(socket: super::SystemSocket) -> Socket { + Socket::from(socket.into_inner()) +} + +#[derive(Clone)] +pub struct Notifier(SystemNotifier); + +impl Notifier { + #[cfg_attr(not(target_os = "linux"), allow(clippy::unused_self))] + pub fn notify(&self) { + cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + self.0.write(1); + } + } + } +} + +cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + pub(crate) mod io_uring; + pub use io_uring::{listen, is_available, queue}; + + pub type Receiver = io_uring::EventFd; + pub type SystemNotifier = io_uring::EventFdWriter; + } else { + pub type Receiver = (); + pub type SystemNotifier = (); + + pub fn listen(_: super::Listener, _: crate::net::packet::PacketQueue) -> eyre::Result<()> { eyre::bail!("completion based io unavailable on platform") } + pub fn is_available() -> bool { false } + + pub fn queue() -> std::io::Result<(crate::net::io::Notifier, crate::net::io::Receiver)> { + Err(std::io::Error::new(std::io::ErrorKind::Other, "completion based io unavailable on platform")) + } + } +} diff --git a/src/net/io_uring.rs b/src/net/io/completion/io_uring.rs similarity index 88% rename from src/net/io_uring.rs rename to src/net/io/completion/io_uring.rs index 8e48ca7117..fb2add8802 100644 --- a/src/net/io_uring.rs +++ b/src/net/io/completion/io_uring.rs @@ -14,25 +14,99 @@ * limitations under the License. */ -//! We have two cases in the proxy where io-uring is used that are _almost_ identical -//! so this just has a shared implementation of utilities -//! -//! Note there is also the QCMP loop, but that one is simpler and is different -//! enough that it doesn't make sense to share the same code +use std::{ + os::fd::{AsRawFd, FromRawFd}, + sync::Arc, +}; + +use io_uring::{squeue::Entry, types::Fd}; +use socket2::SockAddr; use crate::{ collections::PoolBuffer, - components::proxy::{self, PipelineError}, metrics, - net::{PacketQueue, packet::queue::SendPacket}, + net::{ + PipelineError, + io::Listener, + packet::{PacketQueue, SendPacket}, + sessions::SessionPool, + }, time::UtcTimestamp, }; -use io_uring::{squeue::Entry, types::Fd}; -use socket2::SockAddr; -use std::{ - os::fd::{AsRawFd, FromRawFd}, - sync::Arc, -}; +use eyre::Context as _; + +pub fn is_available() -> bool { + let Err(err) = io_uring::IoUring::new(2) else { + return true; + }; + + if err.kind() == std::io::ErrorKind::PermissionDenied && in_container() { + tracing::error!( + "failed to call `io_uring_setup` due to EPERM ({err}), quilkin seems to be running inside a container meaning this is likely due to the seccomp profile not allowing the syscall" + ); + } else { + tracing::error!("failed to call `io_uring_setup` due to {err}"); + } + + false +} + +fn in_container() -> bool { + let sched = match std::fs::read_to_string("/proc/1/sched") { + Ok(s) => s, + Err(error) => { + tracing::warn!( + %error, + "unable to read /proc/1/sched to determine if quilkin is in a container" + ); + return false; + } + }; + let Some(line) = sched.lines().next() else { + tracing::warn!("/proc/1/sched was empty"); + return false; + }; + let Some(proc) = line.split(' ').next() else { + tracing::warn!("first line of /proc/1/sched was empty"); + return false; + }; + proc != "init" && proc != "systemd" +} + +pub fn queue() -> std::io::Result<(crate::net::io::Notifier, crate::net::io::Receiver)> { + EventFd::new().map(|rx| { + ( + crate::net::io::Notifier::Completion(crate::net::io::completion::Notifier(rx.writer())), + crate::net::io::Receiver::Completion(rx), + ) + }) +} + +pub fn listen( + Listener { + worker_id, + port, + config, + sessions, + buffer_pool, + }: Listener, + pending_sends: crate::net::packet::PacketQueue, +) -> eyre::Result<()> { + let io_loop = IoUringLoop::new(2000, crate::net::io::Socket::completion_from_port(port)?)?; + io_loop + .spawn( + format!("packet-router-{worker_id}"), + PacketProcessorCtx::Router { + config, + sessions, + worker_id, + destinations: Vec::with_capacity(1), + }, + pending_sends, + buffer_pool, + ) + .context("failed to spawn io-uring loop") +} /// A simple wrapper around [eventfd](https://man7.org/linux/man-pages/man2/eventfd.2.html) /// @@ -84,7 +158,7 @@ impl EventFd { } #[derive(Clone)] -pub(crate) struct EventFdWriter { +pub struct EventFdWriter { fd: i32, } @@ -214,12 +288,12 @@ impl LoopPacket { pub enum PacketProcessorCtx { Router { config: Arc, - sessions: Arc, + sessions: Arc, worker_id: usize, destinations: Vec, }, SessionPool { - pool: Arc, + pool: Arc, port: u16, }, } @@ -243,7 +317,7 @@ fn process_packet( } *last_received_at = Some(received_at); - let ds_packet = proxy::packet_router::DownstreamPacket { + let ds_packet = crate::net::packet::DownstreamPacket { contents: packet.buffer, source: packet.source, }; @@ -457,15 +531,17 @@ impl IoUringLoop { let mut loop_ctx = LoopCtx { sq, - socket_fd: socket.raw_fd(), + socket_fd: io_uring::types::Fd(socket.as_raw_fd()), backlog: Default::default(), loop_packets, tokens, }; loop_ctx.enqueue_recv(buffer_pool.clone().alloc()); - loop_ctx - .push_with_token(pending_sends_event.io_uring_entry(), Token::PendingsSends); + loop_ctx.push_with_token( + pending_sends_event.as_completion_mut().io_uring_entry(), + Token::PendingsSends, + ); // Sync always needs to be called when entries have been pushed // onto the submission queue for the loop to actually function (ie, similar to await on futures) @@ -520,7 +596,7 @@ impl IoUringLoop { Token::PendingsSends => { double_pending_sends = pending_sends.swap(double_pending_sends); loop_ctx.push_with_token( - pending_sends_event.io_uring_entry(), + pending_sends_event.as_completion_mut().io_uring_entry(), Token::PendingsSends, ); diff --git a/src/net/io/nic.rs b/src/net/io/nic.rs new file mode 100644 index 0000000000..8a3ff1fbfe --- /dev/null +++ b/src/net/io/nic.rs @@ -0,0 +1,11 @@ +pub const NAME: &str = "quilkin_xdp"; + +cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + pub mod xdp; + pub use xdp::{listen, is_available}; + } else { + pub fn listen(_config: &std::sync::Arc, _udp_port: u16, _qcmp_port: u16, _xdp: crate::cli::XdpOptions) -> crate::Result> { eyre::bail!("NIC I/O backend unavailable for platform") } + pub fn is_available(_: &crate::cli::XdpOptions) -> bool { false } + } +} diff --git a/src/net/xdp.rs b/src/net/io/nic/xdp.rs similarity index 58% rename from src/net/xdp.rs rename to src/net/io/nic/xdp.rs index 8b31c5ddbb..8be93c9e45 100644 --- a/src/net/xdp.rs +++ b/src/net/io/nic/xdp.rs @@ -1,12 +1,82 @@ #![allow(dead_code)] -use quilkin_xdp::xdp::{ - self, - nic::{NicIndex, NicName}, -}; -use std::sync::Arc; pub mod process; +use std::{ + net::{Ipv4Addr, Ipv6Addr}, + sync::Arc, +}; + +use self::{packet::net_types::NetworkU16, slab::Slab}; + +pub use quilkin_xdp::xdp::{ + nic::{NetdevCapabilities, NicIndex, NicName}, + packet, slab, +}; + +use quilkin_xdp::xdp; + +pub fn is_available(xdp: &crate::cli::XdpOptions) -> bool { + let config = XdpConfig { + nic: xdp + .network_interface + .as_deref() + .map_or(NicConfig::Default, NicConfig::Name), + external_port: 0, + qcmp_port: 0, + maximum_packet_memory: xdp.maximum_memory, + require_zero_copy: xdp.force_zerocopy, + require_tx_checksum: xdp.force_tx_checksum_offload, + }; + + config.validate().is_ok() +} + +pub fn listen( + config: &Arc, + udp_port: u16, + qcmp_port: u16, + xdp: crate::cli::XdpOptions, +) -> crate::Result> { + use eyre::{Context as _, ContextCompat as _}; + + // TODO: remove this once it's been more stabilized + if !xdp.force_xdp { + return Ok(None); + } + + let filters = config + .dyn_cfg + .filters() + .context("XDP requires a filter chain")? + .clone(); + let clusters = config + .dyn_cfg + .clusters() + .context("XDP requires a cluster map")? + .clone(); + + let config = process::ConfigState { filters, clusters }; + + tracing::info!(udp_port, qcmp_port, "setting up xdp module"); + let xdp = XdpConfig { + nic: xdp + .network_interface + .as_deref() + .map_or(NicConfig::Default, NicConfig::Name), + external_port: udp_port, + qcmp_port, + maximum_packet_memory: xdp.maximum_memory, + require_zero_copy: xdp.force_zerocopy, + require_tx_checksum: xdp.force_tx_checksum_offload, + }; + + let io_loop = spawn(xdp.load()?, config).context("failed to spawn XDP I/O loop")?; + Ok(Some(Box::new(move |srx: &crate::signal::ShutdownRx| { + io_loop.shutdown(*srx.borrow() == crate::signal::ShutdownKind::Normal); + }))) +} + pub enum NicConfig<'n> { /// Specifies a NIC by name, setup will fail if a NIC with that name doesn't exist Name(&'n str), @@ -60,6 +130,197 @@ impl Default for XdpConfig<'_> { } } +impl XdpConfig<'_> { + fn nic_index(&self) -> Result { + match self.nic { + NicConfig::Default => { + let mut chosen = None; + + for iface in xdp::nic::InterfaceIter::new().map_err(NicUnavailable::Query)? { + if let Some(chosen) = chosen { + if iface != chosen { + return Err(NicUnavailable::NoAvailableDefault); + } + } else { + chosen = Some(iface); + } + } + + chosen.ok_or(NicUnavailable::NoAvailableDefault) + } + NicConfig::Name(name) => { + let cname = std::ffi::CString::new(name).unwrap(); + xdp::nic::NicIndex::lookup_by_name(&cname) + .map_err(NicUnavailable::Query)? + .ok_or_else(|| NicUnavailable::UnknownName(name.to_owned())) + } + NicConfig::Index(index) => Ok(xdp::nic::NicIndex::new(index)), + } + } + + fn validate_device_capabilities( + &self, + nic_index: NicIndex, + name: NicName, + ) -> Result { + let device_caps = nic_index + .query_capabilities() + .map_err(|err| XdpSetupError::NicQuery(name, err))?; + + tracing::debug!(?device_caps, nic = ?nic_index, "XDP features for device"); + + if self.require_zero_copy + && matches!(device_caps.zero_copy, xdp::nic::XdpZeroCopy::Unavailable) + { + tracing::error!(?device_caps, nic = ?nic_index, "XDP features for device"); + return Err(XdpSetupError::ZeroCopyUnavailable(name)); + } + + if self.require_tx_checksum && !device_caps.tx_metadata.checksum() { + tracing::error!(?device_caps, nic = ?nic_index, "XDP features for device"); + return Err(XdpSetupError::TxChecksumUnavailable(name)); + } + + Ok(device_caps) + } + + fn addresses( + nic_index: NicIndex, + name: NicName, + ) -> Result<(Ipv4Addr, Ipv6Addr), XdpSetupError> { + nic_index + .addresses() + .and_then(|(ipv4, ipv6)| { + if ipv4.is_none() && ipv6.is_none() { + Err(std::io::Error::new( + std::io::ErrorKind::AddrNotAvailable, + "neither an ipv4 nor ipv6 address could be determined for the device", + )) + } else { + Ok(( + ipv4.unwrap_or(std::net::Ipv4Addr::new(0, 0, 0, 0)), + ipv6.unwrap_or(std::net::Ipv6Addr::from_bits(0)), + )) + } + }) + .map_err(|err| XdpSetupError::AddressQuery(name, err)) + } + + fn packet_count( + &self, + name: NicName, + device_caps: &NetdevCapabilities, + ) -> Result { + // Bit arbitrary, but set the floor at 128 packets per umem + const MINIMUM_UMEM_COUNT: u64 = 128; + // We don't support unaligned chunks, so this size can only be 2k or 4k, + // and we only need 2k since we only care about non-fragmented UDP packets + const PACKET_SIZE: u64 = 2 * 1024; + + if let Some(max) = self.maximum_packet_memory { + let bytes_per_socket = max / device_caps.queue_count as u64; + let packet_count = (bytes_per_socket / PACKET_SIZE).next_power_of_two(); + if MINIMUM_UMEM_COUNT > packet_count { + fn byte_units(b: u64) -> (f64, &'static str) { + let mut units = b as f64; + let mut unit = 0; + const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB"]; + + while units > 1024.0 { + units /= 1024.0; + unit += 1; + } + + (units, UNITS[unit]) + } + + let (max, xunit) = byte_units(max); + let (min, nunit) = + byte_units(MINIMUM_UMEM_COUNT * PACKET_SIZE * device_caps.queue_count as u64); + + return Err(XdpSetupError::MinimumMemoryRequirementsExceeded { + max, + xunit, + min, + nunit, + nic: name, + queue_count: device_caps.queue_count, + }); + } + + Ok(packet_count as u32) + } else { + Ok(2 * 1024) + } + } + + fn validate(&self) -> Result<(), XdpSetupError> { + let nic_index = self.nic_index()?; + let name = nic_index + .name() + .map_err(|_err| NicUnavailable::UnknownIndex(nic_index.into()))?; + + tracing::info!(nic = ?nic_index, "selected NIC"); + self.validate_device_capabilities(nic_index, name)?; + Self::addresses(nic_index, name)?; + Ok(()) + } + + /// Attempts to setup XDP by querying NIC support and allocating ring buffers + /// based on user configuration, failing if requirements cannot be met + /// + /// This function currently only supports one mode of operation, which is that + /// a socket is bound to every available queue on the NIC, and when [`spawn`] + /// is invoked, each socket is processed in its own thread + /// + /// Binding to fewer queues is possible in the future but requires additional + /// work in the `xdp` crate + pub fn load(self) -> Result { + let nic_index = self.nic_index()?; + let name = nic_index + .name() + .map_err(|_err| NicUnavailable::UnknownIndex(nic_index.into()))?; + + tracing::info!(nic = ?nic_index, "selected NIC"); + let device_caps = self.validate_device_capabilities(nic_index, name)?; + let (ipv4, ipv6) = Self::addresses(nic_index, name)?; + let packet_count = self.packet_count(name, &device_caps)?; + + let mut ebpf_prog = quilkin_xdp::EbpfProgram::load(self.external_port, self.qcmp_port)?; + + let umem_cfg = xdp::umem::UmemCfgBuilder { + frame_size: xdp::umem::FrameSize::TwoK, + // Provide enough headroom so that we can convert an ipv4 header to ipv6 + // header without needing to copy any bytes. note this doesn't take into + // account if a filter adds or removes bytes from the beginning of the + // data payload + head_room: (xdp::packet::net_types::Ipv6Hdr::LEN - xdp::packet::net_types::Ipv4Hdr::LEN) + as u32, + frame_count: packet_count, + // TODO: This should be done in the type system so we can avoid logic + // that doesn't change during the course of operation, but for now just + // do it at runtime + tx_checksum: device_caps.tx_metadata.checksum(), + ..Default::default() + } + .build()?; + + let ring_cfg = xdp::RingConfigBuilder::default().build()?; + let workers = + ebpf_prog.create_and_bind_sockets(nic_index, umem_cfg, &device_caps, ring_cfg)?; + + Ok(XdpWorkers { + ebpf_prog, + workers, + nic: nic_index, + external_port: self.external_port.into(), + qcmp_port: self.qcmp_port.into(), + ipv4, + ipv6, + }) + } +} + pub struct XdpWorkers { ebpf_prog: quilkin_xdp::EbpfProgram, workers: Vec, @@ -121,157 +382,6 @@ pub enum XdpSpawnError { XdpAttach(#[from] quilkin_xdp::aya::programs::ProgramError), } -/// Attempts to setup XDP by querying NIC support and allocating ring buffers -/// based on user configuration, failing if requirements cannot be met -/// -/// This function currently only supports one mode of operation, which is that -/// a socket is bound to every available queue on the NIC, and when [`spawn`] -/// is invoked, each socket is processed in its own thread -/// -/// Binding to fewer queues is possible in the future but requires additional -/// work in the `xdp` crate -pub fn setup_xdp_io(config: XdpConfig<'_>) -> Result { - let nic_index = match config.nic { - NicConfig::Default => { - let mut chosen = None; - - for iface in xdp::nic::InterfaceIter::new().map_err(NicUnavailable::Query)? { - if let Some(chosen) = chosen { - if iface != chosen { - return Err(NicUnavailable::NoAvailableDefault.into()); - } - } else { - chosen = Some(iface); - } - } - - chosen.ok_or(NicUnavailable::NoAvailableDefault)? - } - NicConfig::Name(name) => { - let cname = std::ffi::CString::new(name).unwrap(); - xdp::nic::NicIndex::lookup_by_name(&cname) - .map_err(NicUnavailable::Query)? - .ok_or_else(|| NicUnavailable::UnknownName(name.to_owned()))? - } - NicConfig::Index(index) => xdp::nic::NicIndex::new(index), - }; - - let name = nic_index - .name() - .map_err(|_err| NicUnavailable::UnknownIndex(nic_index.into()))?; - - tracing::info!(nic = ?nic_index, "selected NIC"); - - let device_caps = nic_index - .query_capabilities() - .map_err(|err| XdpSetupError::NicQuery(name, err))?; - - tracing::debug!(?device_caps, nic = ?nic_index, "XDP features for device"); - - if config.require_zero_copy - && matches!(device_caps.zero_copy, xdp::nic::XdpZeroCopy::Unavailable) - { - tracing::error!(?device_caps, nic = ?nic_index, "XDP features for device"); - return Err(XdpSetupError::ZeroCopyUnavailable(name)); - } - - if config.require_tx_checksum && !device_caps.tx_metadata.checksum() { - tracing::error!(?device_caps, nic = ?nic_index, "XDP features for device"); - return Err(XdpSetupError::TxChecksumUnavailable(name)); - } - - let (ipv4, ipv6) = nic_index - .addresses() - .and_then(|(ipv4, ipv6)| { - if ipv4.is_none() && ipv6.is_none() { - Err(std::io::Error::new( - std::io::ErrorKind::AddrNotAvailable, - "neither an ipv4 nor ipv6 address could be determined for the device", - )) - } else { - Ok(( - ipv4.unwrap_or(std::net::Ipv4Addr::new(0, 0, 0, 0)), - ipv6.unwrap_or(std::net::Ipv6Addr::from_bits(0)), - )) - } - }) - .map_err(|err| XdpSetupError::AddressQuery(name, err))?; - - // Bit arbitrary, but set the floor at 128 packets per umem - const MINIMUM_UMEM_COUNT: u64 = 128; - // We don't support unaligned chunks, so this size can only be 2k or 4k, - // and we only need 2k since we only care about non-fragmented UDP packets - const PACKET_SIZE: u64 = 2 * 1024; - - let packet_count = if let Some(max) = config.maximum_packet_memory { - let bytes_per_socket = max / device_caps.queue_count as u64; - let packet_count = (bytes_per_socket / PACKET_SIZE).next_power_of_two(); - if MINIMUM_UMEM_COUNT > packet_count { - fn byte_units(b: u64) -> (f64, &'static str) { - let mut units = b as f64; - let mut unit = 0; - const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB"]; - - while units > 1024.0 { - units /= 1024.0; - unit += 1; - } - - (units, UNITS[unit]) - } - - let (max, xunit) = byte_units(max); - let (min, nunit) = - byte_units(MINIMUM_UMEM_COUNT * PACKET_SIZE * device_caps.queue_count as u64); - - return Err(XdpSetupError::MinimumMemoryRequirementsExceeded { - max, - xunit, - min, - nunit, - nic: name, - queue_count: device_caps.queue_count, - }); - } - - packet_count as u32 - } else { - 2 * 1024 - }; - - let mut ebpf_prog = quilkin_xdp::EbpfProgram::load(config.external_port, config.qcmp_port)?; - - let umem_cfg = xdp::umem::UmemCfgBuilder { - frame_size: xdp::umem::FrameSize::TwoK, - // Provide enough headroom so that we can convert an ipv4 header to ipv6 - // header without needing to copy any bytes. note this doesn't take into - // account if a filter adds or removes bytes from the beginning of the - // data payload - head_room: (xdp::packet::net_types::Ipv6Hdr::LEN - xdp::packet::net_types::Ipv4Hdr::LEN) - as u32, - frame_count: packet_count, - // TODO: This should be done in the type system so we can avoid logic - // that doesn't change during the course of operation, but for now just - // do it at runtime - tx_checksum: device_caps.tx_metadata.checksum(), - ..Default::default() - } - .build()?; - - let ring_cfg = xdp::RingConfigBuilder::default().build()?; - let workers = ebpf_prog.create_and_bind_sockets(nic_index, umem_cfg, &device_caps, ring_cfg)?; - - Ok(XdpWorkers { - ebpf_prog, - workers, - nic: nic_index, - external_port: config.external_port.into(), - qcmp_port: config.qcmp_port.into(), - ipv4, - ipv6, - }) -} - pub struct XdpLoop { threads: Vec>, ebpf_prog: quilkin_xdp::EbpfProgram, @@ -381,7 +491,6 @@ pub fn spawn(workers: XdpWorkers, config: process::ConfigState) -> Result::new(); let mut tx_slab = xdp::slab::StackSlab::<{ BATCH_SIZE << 2 }>::new(); let mut pending_sends = 0; diff --git a/src/net/xdp/process.rs b/src/net/io/nic/xdp/process.rs similarity index 99% rename from src/net/xdp/process.rs rename to src/net/io/nic/xdp/process.rs index 742cbd4816..9fbf49d5df 100644 --- a/src/net/xdp/process.rs +++ b/src/net/io/nic/xdp/process.rs @@ -1,14 +1,13 @@ -use crate::{ - components::proxy::{PipelineError, sessions::inner_metrics as session_metrics}, - filters::{self, Filter as _}, - metrics::{self, AsnInfo}, - net::{ - EndpointAddress, - maxmind_db::{self, IpNetEntry}, +use std::{ + collections::hash_map::Entry, + net::{IpAddr, SocketAddr}, + sync::{ + Arc, + atomic::{AtomicU16, Ordering}, }, - time::UtcTimestamp, + time::Instant, }; -pub use quilkin_xdp::xdp; + use quilkin_xdp::xdp::{ Umem, packet::{ @@ -17,14 +16,17 @@ use quilkin_xdp::xdp::{ }, slab::{Slab, StackSlab}, }; -use std::{ - collections::hash_map::Entry, - net::{IpAddr, SocketAddr}, - sync::{ - Arc, - atomic::{AtomicU16, Ordering}, + +use crate::{ + filters::{self, Filter as _}, + metrics::{self, AsnInfo}, + net::{ + EndpointAddress, + error::PipelineError, + maxmind_db::{self, IpNetEntry}, + sessions::inner_metrics as session_metrics, }, - time::Instant, + time::UtcTimestamp, }; /// Wrapper around the actual packet buffer and the UDP metadata it parsed to @@ -833,8 +835,7 @@ fn process_qcmp_packet( #[cfg(test)] mod test { use super::*; - use quilkin_xdp::xdp::packet::Pod; - use xdp::packet::net_types as nt; + use quilkin_xdp::xdp::packet::{Pod, net_types as nt}; #[test] fn xdp_buffer_manipulation() { @@ -862,7 +863,7 @@ mod test { ); let mut data = [0u8; 2048]; - let mut buffer = xdp::Packet::testing_new(&mut data); + let mut buffer = quilkin_xdp::xdp::Packet::testing_new(&mut data); buffer.adjust_tail(headers.data.start as _).unwrap(); headers.set_packet_headers(&mut buffer).unwrap(); buffer.insert(headers.data.start, &payload).unwrap(); diff --git a/src/net/io/poll.rs b/src/net/io/poll.rs new file mode 100644 index 0000000000..ad7fe6bdf9 --- /dev/null +++ b/src/net/io/poll.rs @@ -0,0 +1,149 @@ +/* + * Copyright 2024 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub type Notifier = tokio::sync::watch::Sender; +pub type Receiver = tokio::sync::watch::Receiver; +pub use tokio::net::UdpSocket as Socket; + +pub const NAME: &str = "tokio::net"; + +#[track_caller] +pub fn from_system_socket(socket: super::SystemSocket) -> Socket { + Socket::from_std(std::net::UdpSocket::from(socket.into_inner())).unwrap() +} + +#[track_caller] +pub fn queue() -> (super::Notifier, super::Receiver) { + let (tx, rx) = tokio::sync::watch::channel(true); + + (super::Notifier::Polling(tx), super::Receiver::Polling(rx)) +} + +pub fn listen( + super::Listener { + worker_id, + port, + config, + sessions, + buffer_pool, + }: super::Listener, + packet_queue: crate::net::packet::PacketQueue, +) -> eyre::Result<()> { + let thread_span = uring_span!(tracing::debug_span!("receiver", id = worker_id).or_current()); + let (tx, mut rx) = tokio::sync::oneshot::channel(); + + let worker = uring_spawn!(thread_span, async move { + crate::metrics::game_traffic_tasks().inc(); + let mut last_received_at = None; + let socket = std::sync::Arc::new(crate::net::Socket::polling_from_port(port).unwrap()); + + tracing::trace!(port, "bound worker"); + let send_socket = socket.clone(); + + let inner_task = async move { + let (packet_queue, mut sends_rx) = packet_queue; + let mut sends_double_buffer = Vec::with_capacity(packet_queue.capacity()); + + while sends_rx.as_polling_mut().changed().await.is_ok() { + if !*sends_rx.as_polling().borrow() { + tracing::trace!("io loop shutdown requested"); + break; + } + + sends_double_buffer = packet_queue.swap(sends_double_buffer); + + for packet in sends_double_buffer.drain(..sends_double_buffer.len()) { + let result = send_socket + .send_to(packet.data, packet.destination.as_socket().unwrap()) + .await; + let asn_info = packet.asn_info.as_ref().into(); + match result { + Ok(size) => { + crate::metrics::packets_total(crate::metrics::WRITE, &asn_info).inc(); + crate::metrics::bytes_total(crate::metrics::WRITE, &asn_info) + .inc_by(size as u64); + } + Err(error) => { + let source = error.to_string(); + crate::metrics::errors_total(crate::metrics::WRITE, &source, &asn_info) + .inc(); + crate::metrics::packets_dropped_total( + crate::metrics::WRITE, + &source, + &asn_info, + ) + .inc(); + } + } + } + } + + let _ = tx.send(()); + }; + + tokio::spawn(inner_task); + + let mut destinations = Vec::with_capacity(1); + + loop { + // Initialize a buffer for the UDP packet. We use the maximum size of a UDP + // packet, which is the maximum value of 16 a bit integer. + let mut buffer = buffer_pool.clone().alloc(); + + tokio::select! { + result = socket.recv_from(&mut *buffer) => { + let received_at = crate::time::UtcTimestamp::now(); + + match result { + Ok((_size, mut source)) => { + source.set_ip(source.ip().to_canonical()); + let packet = crate::net::packet::DownstreamPacket { contents: buffer, source }; + + if let Some(last_received_at) = last_received_at { + crate::metrics::packet_jitter( + crate::metrics::READ, + &crate::metrics::EMPTY, + ) + .set((received_at - last_received_at).nanos()); + } + last_received_at = Some(received_at); + + packet.process( + worker_id, + &config, + &sessions, + &mut destinations, + ); + } + Err(error) => { + tracing::error!(%error, "error receiving packet"); + return; + } + } + } + _ = &mut rx => { + crate::metrics::game_traffic_task_closed().inc(); + tracing::debug!("Closing downstream socket loop, shutdown requested"); + return; + } + } + } + }); + + use eyre::WrapErr as _; + worker.recv().context("failed to spawn receiver task")?; + Ok(()) +} diff --git a/src/net/io/socket.rs b/src/net/io/socket.rs new file mode 100644 index 0000000000..ddb1c7df90 --- /dev/null +++ b/src/net/io/socket.rs @@ -0,0 +1,183 @@ +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +use socket2::{Protocol, Type}; + +use super::{completion, poll}; + +#[derive(Debug)] +#[repr(transparent)] +pub struct SystemSocket(socket2::Socket); + +impl SystemSocket { + pub fn new(socket: socket2::Socket) -> Self { + Self(socket) + } + + #[track_caller] + pub fn listen() -> std::io::Result { + Self::from_port(0) + } + + #[track_caller] + pub fn from_port(port: u16) -> std::io::Result { + Self::from_addr((std::net::Ipv6Addr::UNSPECIFIED, port).into()) + } + + #[track_caller] + fn from_addr(addr: SocketAddr) -> std::io::Result { + let domain = match addr { + SocketAddr::V4(_) => socket2::Domain::IPV4, + SocketAddr::V6(_) => socket2::Domain::IPV6, + }; + + let sock = socket2::Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; + + cfg_if::cfg_if! { + if #[cfg(target_family = "windows")] { + sock.set_reuse_address(true)?; + } else { + sock.set_reuse_port(true)?; + } + } + + sock.set_nonblocking(true)?; + if domain == socket2::Domain::IPV6 { + // be explicit so we can have dual stack sockets. + sock.set_only_v6(false)?; + } + sock.bind(&addr.into())?; + + Ok(Self(sock)) + } + + #[inline] + #[track_caller] + pub fn port(&self) -> u16 { + match self.0.local_addr().unwrap().as_socket().unwrap() { + SocketAddr::V4(addr) => addr.port(), + SocketAddr::V6(addr) => addr.port(), + } + } + + pub fn into_inner(self) -> socket2::Socket { + self.0 + } +} + +impl std::ops::Deref for SystemSocket { + type Target = socket2::Socket; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// An ipv6 socket that can accept and send data from either a local ipv4 address or ipv6 address +/// with port reuse enabled and `only_v6` set to false. +#[derive(Debug)] +pub struct Socket { + socket: SocketKind, + local_addr: SocketAddr, +} + +#[derive(Debug)] +enum SocketKind { + Completion(completion::Socket), + Polling(poll::Socket), +} + +impl Socket { + pub fn polling(socket: SystemSocket) -> Self { + let socket = poll::from_system_socket(socket); + let local_addr = socket.local_addr().unwrap(); + Self { + socket: SocketKind::Polling(socket), + local_addr, + } + } + + pub fn polling_from_addr(addr: SocketAddr) -> std::io::Result { + SystemSocket::from_addr(addr).map(Self::polling) + } + + pub fn polling_from_port(port: u16) -> std::io::Result { + Self::polling_from_addr((std::net::Ipv6Addr::UNSPECIFIED, port).into()) + } + + pub fn polling_listen() -> std::io::Result { + Self::polling_from_port(0) + } + + pub fn completion(socket: SystemSocket) -> Self { + let socket = completion::from_system_socket(socket); + let local_addr = socket.local_addr().unwrap(); + Self { + socket: SocketKind::Completion(socket), + local_addr, + } + } + + pub fn completion_from_addr(addr: SocketAddr) -> std::io::Result { + SystemSocket::from_addr(addr).map(Self::completion) + } + + pub fn completion_from_port(port: u16) -> std::io::Result { + Self::completion_from_addr((std::net::Ipv6Addr::UNSPECIFIED, port).into()) + } + + pub fn completion_listen() -> std::io::Result { + Self::completion_from_port(0) + } + + pub fn local_addr(&self) -> SocketAddr { + self.local_addr + } + + pub fn local_ipv4_addr(&self) -> SocketAddr { + match self.local_addr { + SocketAddr::V4(_) => self.local_addr, + SocketAddr::V6(_) => (Ipv4Addr::UNSPECIFIED, self.local_addr.port()).into(), + } + } + + pub fn local_ipv6_addr(&self) -> SocketAddr { + match self.local_addr { + SocketAddr::V4(v4addr) => SocketAddr::new( + IpAddr::V6(v4addr.ip().to_ipv6_mapped()), + self.local_addr.port(), + ), + SocketAddr::V6(_) => self.local_addr, + } + } + + pub async fn recv_from>( + &self, + mut buf: B, + ) -> std::io::Result<(usize, SocketAddr)> { + match &self.socket { + SocketKind::Completion(socket) => socket.recv_from(&mut buf), + SocketKind::Polling(socket) => socket.recv_from(&mut buf).await, + } + } + + pub async fn send_to>( + &self, + buf: B, + target: SocketAddr, + ) -> std::io::Result { + match &self.socket { + SocketKind::Completion(socket) => socket.send_to(&buf, target), + SocketKind::Polling(socket) => socket.send_to(&buf, target).await, + } + } +} + +#[cfg(unix)] +impl std::os::fd::AsRawFd for Socket { + fn as_raw_fd(&self) -> i32 { + match &self.socket { + SocketKind::Completion(socket) => socket.as_raw_fd(), + SocketKind::Polling(socket) => socket.as_raw_fd(), + } + } +} diff --git a/src/net/packet.rs b/src/net/packet.rs index c9fe3ae997..fb11d166bb 100644 --- a/src/net/packet.rs +++ b/src/net/packet.rs @@ -1,3 +1,199 @@ pub mod queue; -pub use self::queue::{PacketQueue, PacketQueueSender, queue}; +use crate::{ + Config, + filters::{Filter as _, ReadContext}, + metrics, +}; +use std::{net::SocketAddr, sync::Arc}; + +use super::{ + error::PipelineError, + sessions::{SessionKey, SessionManager}, +}; + +pub use queue::{PacketQueue, PacketQueueSender, SendPacket, queue}; + +/// Representation of an immutable set of bytes pulled from the network, this trait +/// provides an abstraction over however the packet was received (epoll, io-uring, xdp) +/// +/// Use [`PacketDataMut`] if you need a mutable representation. +pub trait PacketData: Sized { + /// Returns the underlying slice of bytes representing the packet. + fn as_slice(&self) -> &[u8]; + + /// Returns the size of the packet. + fn len(&self) -> usize; + + /// Returns whether the given packet is empty. + fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +/// Representation of an mutable set of bytes pulled from the network, this trait +/// provides an abstraction over however the packet was received (epoll, io-uring, xdp) +pub trait PacketDataMut: Sized + PacketData { + type FrozenPacket: PacketData; + fn remove_head(&mut self, length: usize); + fn remove_tail(&mut self, length: usize); + fn extend_head(&mut self, bytes: &[u8]); + fn extend_tail(&mut self, bytes: &[u8]); + /// Returns an immutable version of the packet, this allows certain types + /// return a type that can be more cheaply cloned and shared. + fn freeze(self) -> Self::FrozenPacket; +} + +/// Packet received from local port +pub(crate) struct DownstreamPacket

{ + pub(crate) contents: P, + pub(crate) source: SocketAddr, +} + +impl DownstreamPacket

{ + #[inline] + pub(crate) fn process>( + self, + worker_id: usize, + config: &Arc, + sessions: &S, + destinations: &mut Vec, + ) { + tracing::trace!( + id = worker_id, + size = self.contents.len(), + source = %self.source, + "received packet from downstream" + ); + + let timer = metrics::processing_time(metrics::READ).start_timer(); + if let Err(error) = self.process_inner(config, sessions, destinations) { + let discriminant = error.discriminant(); + + error.inc_system_errors_total(metrics::READ, &metrics::EMPTY); + metrics::packets_dropped_total(metrics::READ, discriminant, &metrics::EMPTY).inc(); + } + + timer.stop_and_record(); + } + + /// Processes a packet by running it through the filter chain. + #[inline] + fn process_inner>( + self, + config: &Arc, + sessions: &S, + destinations: &mut Vec, + ) -> Result<(), PipelineError> { + let Some(clusters) = config + .dyn_cfg + .clusters() + .filter(|c| c.read().has_endpoints()) + else { + tracing::trace!("no upstream endpoints"); + return Err(PipelineError::NoUpstreamEndpoints); + }; + + let cm = clusters.clone_value(); + let Some(filters) = config.dyn_cfg.filters() else { + return Err(PipelineError::Filter(crate::filters::FilterError::Custom( + "no filters loaded", + ))); + }; + + #[cfg(not(debug_assertions))] + match self.source.ip() { + std::net::IpAddr::V4(ipv4) => { + if ipv4.is_loopback() || ipv4.is_multicast() || ipv4.is_broadcast() { + return Err(PipelineError::DisallowedSourceIP(self.source.ip())); + } + } + std::net::IpAddr::V6(ipv6) => { + if ipv6.is_loopback() || ipv6.is_multicast() { + return Err(PipelineError::DisallowedSourceIP(self.source.ip())); + } + } + } + + let mut context = ReadContext::new(&cm, self.source.into(), self.contents, destinations); + filters.read(&mut context).map_err(PipelineError::Filter)?; + + let ReadContext { contents, .. } = context; + + // Similar to bytes::BytesMut::freeze, we turn the mutable pool buffer + // into an immutable one with its own internal arc so it can be cloned + // cheaply and returned to the pool once all references are dropped + let contents = contents.freeze(); + + for epa in destinations.drain(0..) { + let session_key = SessionKey { + source: self.source, + dest: epa.to_socket_addr()?, + }; + + sessions.send(session_key, &contents)?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + #![cfg(not(debug_assertions))] + + use quilkin_xds::locality::Locality; + + use crate::collections::BufferPool; + use crate::net::{Endpoint, io::Backend, sessions::SessionPool}; + use crate::test::alloc_buffer; + + use super::*; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + use std::net::{SocketAddrV4, SocketAddrV6}; + + // Ensure we disallow certain source IP addresses to protect against UDP amplification attacks + #[tokio::test] + async fn disallowed_ips() { + let nl1 = Locality::with_region("nl-1"); + let endpoint = Endpoint::new((Ipv4Addr::LOCALHOST, 7777).into()); + + let config = Arc::new(Config::default_agent().cluster( + None, + Some(nl1.clone()), + [endpoint.clone()].into(), + )); + let buffer_pool = Arc::new(BufferPool::new(1, 10)); + let session_manager = SessionPool::new( + config.clone(), + vec![], + buffer_pool.clone(), + Backend::Polling, + ); + + let packet_data: [u8; 4] = [1, 2, 3, 4]; + for ip in [ + IpAddr::V4(Ipv4Addr::LOCALHOST), + IpAddr::V4(Ipv4Addr::BROADCAST), + // multicast = 224.0.0.0/4 + IpAddr::V4(Ipv4Addr::new(224, 0, 0, 0)), + IpAddr::V4(Ipv4Addr::new(239, 255, 255, 255)), + IpAddr::V6(Ipv6Addr::LOCALHOST), + // multicast = any address starting with 0xff + IpAddr::V6(Ipv6Addr::new(0xff00, 0, 0, 0, 0, 0, 0, 0)), + ] { + let packet = DownstreamPacket { + contents: alloc_buffer(packet_data), + source: match ip { + IpAddr::V4(ipv4) => SocketAddr::V4(SocketAddrV4::new(ipv4, 0)), + IpAddr::V6(ipv6) => SocketAddr::V6(SocketAddrV6::new(ipv6, 0, 0, 0)), + }, + }; + + let mut endpoints = vec![endpoint.address.clone()]; + let res = packet.process_inner(&config, &session_manager, &mut endpoints); + + assert_eq!(res, Err(PipelineError::DisallowedSourceIP(ip))); + } + } +} diff --git a/src/net/packet/queue.rs b/src/net/packet/queue.rs index 35800c7538..cb039927ef 100644 --- a/src/net/packet/queue.rs +++ b/src/net/packet/queue.rs @@ -1,9 +1,9 @@ use std::sync::Arc; -pub type PacketQueue = (PacketQueueSender, PacketQueueReceiver); +pub type PacketQueue = (PacketQueueSender, crate::net::io::Receiver); -pub fn queue(capacity: usize) -> std::io::Result { - let (notify, rx) = packet_queue()?; +pub fn queue(capacity: usize, backend: crate::net::io::Backend) -> std::io::Result { + let (notify, rx) = backend.queue()?; Ok(( PacketQueueSender { @@ -21,7 +21,7 @@ pub fn queue(capacity: usize) -> std::io::Result { #[derive(Clone)] pub struct PacketQueueSender { packets: Arc>>, - notify: PacketQueueNotifier, + notify: crate::net::io::Notifier, } impl PacketQueueSender { @@ -35,7 +35,7 @@ impl PacketQueueSender { #[inline] pub fn push(&self, packet: SendPacket) { self.packets.lock().push(packet); - push(&self.notify); + self.notify.notify(); } /// Swaps the current queue with an empty one so we only lock for a pointer swap @@ -54,32 +54,3 @@ pub struct SendPacket { /// The asn info for the sender, used for metrics pub asn_info: Option, } - -cfg_if::cfg_if! { - if #[cfg(target_os = "linux")] { - pub type PacketQueueReceiver = crate::net::io_uring::EventFd; - type PacketQueueNotifier = crate::net::io_uring::EventFdWriter; - - fn packet_queue() -> std::io::Result<(PacketQueueNotifier, PacketQueueReceiver)> { - let rx = crate::net::io_uring::EventFd::new()?; - Ok((rx.writer(), rx)) - } - - #[inline] - fn push(notify: &PacketQueueNotifier) { - notify.write(1); - } - } else { - pub type PacketQueueReceiver = tokio::sync::watch::Receiver; - type PacketQueueNotifier = tokio::sync::watch::Sender; - - fn packet_queue() -> std::io::Result<(PacketQueueNotifier, PacketQueueReceiver)> { - Ok(tokio::sync::watch::channel(true)) - } - - #[inline] - fn push(notify: &PacketQueueNotifier) { - let _ = notify.send(true); - } - } -} diff --git a/src/net/phoenix.rs b/src/net/phoenix.rs index 1dfb7559a2..7d5eee639d 100644 --- a/src/net/phoenix.rs +++ b/src/net/phoenix.rs @@ -632,8 +632,6 @@ impl Node { #[cfg(test)] mod tests { - use crate::net::raw_socket_with_reuse; - use super::*; use std::collections::HashMap; use std::collections::HashSet; @@ -879,7 +877,7 @@ mod tests { ); let (_tx, rx) = crate::signal::channel(Default::default()); - let socket = raw_socket_with_reuse(qcmp_port).unwrap(); + let socket = crate::net::Socket::polling_from_port(qcmp_port).unwrap(); crate::codec::qcmp::spawn(socket, rx.clone()).unwrap(); tokio::time::sleep(Duration::from_millis(150)).await; diff --git a/src/components/proxy/sessions.rs b/src/net/sessions.rs similarity index 97% rename from src/components/proxy/sessions.rs rename to src/net/sessions.rs index 76ea8f7cb8..62774e940a 100644 --- a/src/components/proxy/sessions.rs +++ b/src/net/sessions.rs @@ -31,9 +31,9 @@ use crate::{ filters::Filter, metrics, net::{ - PacketQueueSender, + error::PipelineError, maxmind_db::{IpNetEntry, MetricsIpNetEntry}, - queue::SendPacket, + packet::{PacketQueueSender, SendPacket}, }, time::UtcTimestamp, }; @@ -56,7 +56,7 @@ cfg_if::cfg_if! { /// tracking metrics and other information about the session. pub trait SessionManager { type Packet: crate::filters::Packet; - fn send(&self, key: SessionKey, contents: &Self::Packet) -> Result<(), super::PipelineError>; + fn send(&self, key: SessionKey, contents: &Self::Packet) -> Result<(), PipelineError>; } #[derive(PartialEq, Eq, Hash)] @@ -105,6 +105,7 @@ pub struct SessionPool { config: Arc, downstream_sends: Vec, downstream_index: atomic::AtomicUsize, + backend: crate::net::io::Backend, } /// The wrapper struct responsible for holding all of the socket related mappings. @@ -124,18 +125,20 @@ impl SessionPool { config: Arc, downstream_sends: Vec, buffer_pool: Arc, + backend: crate::net::io::Backend, ) -> Arc { const SESSION_TIMEOUT_SECONDS: Duration = Duration::from_secs(60); const SESSION_EXPIRY_POLL_INTERVAL: Duration = Duration::from_secs(60); Arc::new(Self { + backend, + buffer_pool, config, + downstream_index: atomic::AtomicUsize::new(0), + downstream_sends, ports_to_sockets: <_>::default(), - storage: <_>::default(), session_map: SessionMap::new(SESSION_TIMEOUT_SECONDS, SESSION_EXPIRY_POLL_INTERVAL), - buffer_pool, - downstream_sends, - downstream_index: atomic::AtomicUsize::new(0), + storage: <_>::default(), }) } @@ -143,16 +146,16 @@ impl SessionPool { fn create_new_session_from_new_socket<'pool>( self: &'pool Arc, key: SessionKey, - ) -> Result<(Option, PacketQueueSender), super::PipelineError> { + ) -> Result<(Option, PacketQueueSender), PipelineError> { tracing::trace!(source=%key.source, dest=%key.dest, "creating new socket for session"); - let raw_socket = crate::net::raw_socket_with_reuse(0)?; + let raw_socket = crate::net::SystemSocket::listen()?; let port = raw_socket .local_addr()? .as_socket() .ok_or(SessionError::SocketAddressUnavailable)? .port(); - let (pending_sends, srecv) = crate::net::queue(15)?; + let (pending_sends, srecv) = crate::net::packet::queue(15, self.backend)?; self.clone() .spawn_session(raw_socket, port, (pending_sends.clone(), srecv))?; @@ -231,7 +234,7 @@ impl SessionPool { pub(crate) fn get<'pool>( self: &'pool Arc, key @ SessionKey { dest, .. }: SessionKey, - ) -> Result<(Option, PacketQueueSender), super::PipelineError> { + ) -> Result<(Option, PacketQueueSender), PipelineError> { tracing::trace!(source=%key.source, dest=%key.dest, "SessionPool::get"); // If we already have a session for the key pairing, return that session. if let Some(entry) = self.session_map.get(&key) { @@ -294,7 +297,7 @@ impl SessionPool { key: SessionKey, pending_sends: PacketQueueSender, socket_port: u16, - ) -> Result<(Option, PacketQueueSender), super::PipelineError> { + ) -> Result<(Option, PacketQueueSender), PipelineError> { tracing::trace!(source=%key.source, dest=%key.dest, "reusing socket for session"); let asn_info = { let mut storage = self.storage.write(); @@ -378,7 +381,7 @@ impl SessionPool { self: &Arc, key: SessionKey, packet: FrozenPoolBuffer, - ) -> Result<(), super::PipelineError> { + ) -> Result<(), PipelineError> { self.send_inner(key, packet)?; Ok(()) } @@ -388,7 +391,7 @@ impl SessionPool { self: &Arc, key: SessionKey, packet: FrozenPoolBuffer, - ) -> Result { + ) -> Result { let (asn_info, sender) = self.get(key)?; sender.push(SendPacket { @@ -459,7 +462,7 @@ impl SessionPool { impl SessionManager for Arc { type Packet = FrozenPoolBuffer; - fn send(&self, key: SessionKey, contents: &Self::Packet) -> Result<(), super::PipelineError> { + fn send(&self, key: SessionKey, contents: &Self::Packet) -> Result<(), PipelineError> { SessionPool::send(self, key, contents.clone()) } } @@ -575,12 +578,14 @@ mod tests { use std::sync::Arc; async fn new_pool() -> (Arc, PacketQueueSender) { - let (pending_sends, _srecv) = crate::net::queue(1).unwrap(); + const BACKEND: crate::net::io::Backend = crate::net::io::Backend::Polling; + let (pending_sends, _srecv) = crate::net::packet::queue(1, BACKEND).unwrap(); ( SessionPool::new( Arc::new(Config::default_agent()), vec![pending_sends.clone()], Arc::new(BufferPool::default()), + BACKEND, ), pending_sends, ) diff --git a/src/components/proxy/sessions/inner_metrics.rs b/src/net/sessions/inner_metrics.rs similarity index 100% rename from src/components/proxy/sessions/inner_metrics.rs rename to src/net/sessions/inner_metrics.rs diff --git a/src/components/proxy/sessions/io_uring.rs b/src/net/sessions/io_uring.rs similarity index 79% rename from src/components/proxy/sessions/io_uring.rs rename to src/net/sessions/io_uring.rs index 981483062a..5237d83e8a 100644 --- a/src/components/proxy/sessions/io_uring.rs +++ b/src/net/sessions/io_uring.rs @@ -14,28 +14,24 @@ * limitations under the License. */ -use crate::components::proxy; use std::sync::Arc; +use crate::net::{PipelineError, SystemSocket, io::completion::io_uring, packet::PacketQueue}; + static SESSION_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0); impl super::SessionPool { pub(super) fn spawn_session( self: Arc, - raw_socket: socket2::Socket, + raw_socket: SystemSocket, port: u16, - pending_sends: crate::net::PacketQueue, - ) -> Result<(), proxy::PipelineError> { - use crate::net::io_uring; - + pending_sends: PacketQueue, + ) -> Result<(), PipelineError> { let pool = self; let id = SESSION_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let _thread_span = uring_span!(tracing::debug_span!("session", id).or_current()); - let io_loop = io_uring::IoUringLoop::new( - 2000, - crate::net::DualStackLocalSocket::from_raw(raw_socket), - )?; + let io_loop = io_uring::IoUringLoop::new(2000, crate::net::Socket::completion(raw_socket))?; let buffer_pool = pool.buffer_pool.clone(); io_loop.spawn( diff --git a/src/components/proxy/sessions/reference.rs b/src/net/sessions/reference.rs similarity index 88% rename from src/components/proxy/sessions/reference.rs rename to src/net/sessions/reference.rs index 8d6f04fa29..aaeb8230c5 100644 --- a/src/components/proxy/sessions/reference.rs +++ b/src/net/sessions/reference.rs @@ -14,15 +14,15 @@ * limitations under the License. */ -use crate::{components::proxy, net::PacketQueue}; +use crate::net::{PipelineError, Socket, SystemSocket, packet::PacketQueue}; impl super::SessionPool { pub(super) fn spawn_session( self: std::sync::Arc, - raw_socket: socket2::Socket, + raw_socket: SystemSocket, port: u16, pending_sends: PacketQueue, - ) -> Result<(), proxy::PipelineError> { + ) -> Result<(), PipelineError> { let pool = self; uring_spawn!( @@ -30,14 +30,14 @@ impl super::SessionPool { async move { let mut last_received_at = None; - let socket = - std::sync::Arc::new(crate::net::DualStackLocalSocket::from_raw(raw_socket)); + let socket = std::sync::Arc::new(Socket::polling(raw_socket)); let socket2 = socket.clone(); let (tx, mut rx) = tokio::sync::oneshot::channel(); - uring_inner_spawn!(async move { + tokio::spawn(async move { let (pending_sends, mut sends_rx) = pending_sends; let mut sends_double_buffer = Vec::with_capacity(pending_sends.capacity()); + let sends_rx = sends_rx.as_polling_mut(); while sends_rx.changed().await.is_ok() { if !*sends_rx.borrow() { @@ -54,7 +54,7 @@ impl super::SessionPool { length = packet.data.len(), "sending packet upstream" ); - let (result, _) = socket2.send_to(packet.data, destination).await; + let result = socket2.send_to(packet.data, destination).await; let asn_info = packet.asn_info.as_ref().into(); match result { Ok(size) => { @@ -87,10 +87,9 @@ impl super::SessionPool { }); loop { - let buf = pool.buffer_pool.clone().alloc(); + let mut buf = pool.buffer_pool.clone().alloc(); tokio::select! { - received = socket.recv_from(buf) => { - let (result, buf) = received; + result = socket.recv_from(&mut *buf) => { match result { Err(error) => { tracing::trace!(%error, "error receiving packet"); diff --git a/src/test.rs b/src/test.rs index 6fd09d07aa..bdb2ce5423 100644 --- a/src/test.rs +++ b/src/test.rs @@ -25,7 +25,7 @@ use crate::{ collections::BufferPool, config::Config, filters::{FilterRegistry, prelude::*}, - net::DualStackEpollSocket as DualStackLocalSocket, + net::DualStackEpollSocket as Socket, net::endpoint::metadata::Value, net::endpoint::{Endpoint, EndpointAddress}, signal::{ShutdownKind, ShutdownRx, ShutdownTx}, @@ -69,6 +69,11 @@ impl From for SocketAddr { } } +/// Returns a random available port +pub fn available_port() -> u16 { + crate::net::SystemSocket::from_port(0).unwrap().port() +} + /// Returns a local address on a port that is not assigned to another test. /// If Random address tye is used, it might be v4, Might be v6. It's random. pub async fn available_addr(address_type: AddressType) -> SocketAddr { @@ -79,17 +84,17 @@ pub async fn available_addr(address_type: AddressType) -> SocketAddr { addr } -fn get_address(address_type: AddressType, socket: &DualStackLocalSocket) -> SocketAddr { +fn get_address(address_type: AddressType, socket: &Socket) -> SocketAddr { let addr = match address_type { AddressType::Random => { // sometimes give ipv6, sometimes ipv4. match rand::random() { - true => socket.local_ipv6_addr().unwrap(), - false => socket.local_ipv4_addr().unwrap(), + true => socket.local_ipv6_addr(), + false => socket.local_ipv4_addr(), } } - AddressType::Ipv4 => socket.local_ipv4_addr().unwrap(), - AddressType::Ipv6 => socket.local_ipv6_addr().unwrap(), + AddressType::Ipv4 => socket.local_ipv4_addr(), + AddressType::Ipv6 => socket.local_ipv6_addr(), }; tracing::debug!(addr = ?addr, "test_util::get_address"); addr @@ -150,7 +155,7 @@ pub struct TestHelper { /// Returned from [creating a socket](TestHelper::open_socket_and_recv_single_packet) pub struct OpenSocketRecvPacket { /// The opened socket - pub socket: Arc, + pub socket: Arc, /// A channel on which the received packet will be forwarded. pub packet_rx: oneshot::Receiver, } @@ -188,7 +193,7 @@ impl TestHelper { let socket_recv = socket.clone(); tokio::spawn(async move { let mut buf = vec![0; 1024]; - let (size, _) = socket_recv.recv_from(&mut buf).await.unwrap(); + let (size, _) = socket_recv.recv_from(&mut *buf).await.unwrap(); packet_tx .send(from_utf8(&buf[..size]).unwrap().to_string()) .unwrap(); @@ -200,7 +205,7 @@ impl TestHelper { /// returned channel. pub async fn open_socket_and_recv_multiple_packets( &mut self, - ) -> (mpsc::Receiver, Arc) { + ) -> (mpsc::Receiver, Arc) { let socket = Arc::new(create_socket().await); let packet_rx = self.recv_multiple_packets(&socket).await; (packet_rx, socket) @@ -209,18 +214,13 @@ impl TestHelper { // Same as above, but sometimes you just need an ipv4 socket pub async fn open_ipv4_socket_and_recv_multiple_packets( &mut self, - ) -> (mpsc::Receiver, Arc) { - let socket = Arc::new( - DualStackLocalSocket::new_with_address((Ipv4Addr::LOCALHOST, 0).into()).unwrap(), - ); + ) -> (mpsc::Receiver, Arc) { + let socket = Arc::new(Socket::polling_from_addr((Ipv4Addr::LOCALHOST, 0).into()).unwrap()); let packet_rx = self.recv_multiple_packets(&socket).await; (packet_rx, socket) } - async fn recv_multiple_packets( - &mut self, - socket: &Arc, - ) -> mpsc::Receiver { + async fn recv_multiple_packets(&mut self, socket: &Arc) -> mpsc::Receiver { let (packet_tx, packet_rx) = mpsc::channel::(10); let mut shutdown_rx = self.get_shutdown_subscriber().await; let socket_recv = socket.clone(); @@ -228,8 +228,8 @@ impl TestHelper { let mut buf = vec![0; 1024]; loop { tokio::select! { - received = socket_recv.recv_from(&mut buf) => { - let (size, _) = received.unwrap(); + result = socket_recv.recv_from(&mut *buf) => { + let (size, _) = result.unwrap(); let str = from_utf8(&buf[..size]).unwrap().to_string(); match packet_tx.send(str).await { Ok(_) => {} @@ -276,7 +276,7 @@ impl TestHelper { loop { let mut buf = vec![0; 1024]; tokio::select! { - recvd = socket.recv_from(&mut buf) => { + recvd = socket.recv_from(&mut *buf) => { let (size, sender) = recvd.unwrap(); let packet = &buf[..size]; tracing::trace!(%sender, %size, "echo server received and returning packet"); @@ -295,7 +295,7 @@ impl TestHelper { pub async fn run_server( &mut self, config: Arc, - server: Option, + server: Option, with_admin: Option>, ) -> u16 { let (shutdown_tx, shutdown_rx) = @@ -308,37 +308,18 @@ impl TestHelper { } let server = server.unwrap_or_else(|| { - let qcmp = crate::net::raw_socket_with_reuse(0).unwrap(); - let phoenix = crate::net::TcpListener::bind(None).unwrap(); - - crate::components::proxy::Proxy { - num_workers: std::num::NonZeroUsize::new(1).unwrap(), - socket: Some(crate::net::raw_socket_with_reuse(0).unwrap()), - qcmp, - phoenix, - ..Default::default() - } - }); - - let (prox_tx, prox_rx) = tokio::sync::oneshot::channel(); - - let port = crate::net::socket_port(server.socket.as_ref().unwrap()); - - tokio::spawn(async move { - server - .run( - crate::components::RunArgs { - config, - ready: Default::default(), - shutdown_rx, - }, - Some(prox_tx), - ) - .await - .unwrap(); + crate::cli::Service::default() + .udp() + .udp_port(available_port()) + .udp_poll() + .phoenix() + .phoenix_port(available_port()) + .qcmp() + .qcmp_port(available_port()) }); - prox_rx.await.unwrap(); + let port = server.get_udp_port(); + server.spawn_services(&config, &shutdown_rx).unwrap(); port } @@ -416,8 +397,10 @@ pub fn map_addr_to_localhost(address: &mut SocketAddr) { } /// Opens a new socket bound to an ephemeral port -pub async fn create_socket() -> DualStackLocalSocket { - DualStackLocalSocket::new(0).unwrap() +pub async fn create_socket() -> Socket { + crate::net::io::Backend::Polling + .socket_from_port(0) + .unwrap() } fn test_proxy_id() -> String { @@ -534,7 +517,7 @@ mod tests { let msg = "hello"; endpoint .socket - .send_to(msg.as_bytes(), &echo_addr.to_socket_addr().unwrap()) + .send_to(msg.as_bytes(), echo_addr.to_socket_addr().unwrap()) .await .unwrap(); assert_eq!( diff --git a/tests/health.rs b/tests/health.rs index 0cbea84ec0..267b492820 100644 --- a/tests/health.rs +++ b/tests/health.rs @@ -20,9 +20,12 @@ use hyper::Uri; use quilkin::{net::endpoint::Endpoint, test::TestHelper}; const LIVE_ADDRESS: &str = "http://localhost:9093/live"; +const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(250); #[tokio::test] +#[ignore] async fn health_server() { + quilkin::test::enable_log("quilkin=trace"); let mut t = TestHelper::default(); // create server configuration @@ -40,15 +43,15 @@ async fn health_server() { Some(Some((std::net::Ipv6Addr::UNSPECIFIED, 9093).into())), ) .await; - tokio::time::sleep(std::time::Duration::from_millis(250)).await; + tokio::time::sleep(TIMEOUT).await; let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) .build_http::>(); use http_body_util::BodyExt; - let resp = client - .get(Uri::from_static(LIVE_ADDRESS)) + let resp = tokio::time::timeout(TIMEOUT, client.get(Uri::from_static(LIVE_ADDRESS))) .await .unwrap() + .unwrap() .into_body() .collect() .await @@ -62,6 +65,12 @@ async fn health_server() { panic!("oh no!"); }); - let resp = client.get(Uri::from_static(LIVE_ADDRESS)).await.unwrap(); + let resp = tokio::time::timeout( + std::time::Duration::from_millis(250), + client.get(Uri::from_static(LIVE_ADDRESS)), + ) + .await + .unwrap() + .unwrap(); assert!(resp.status().is_server_error(), "Should be unhealthy"); } diff --git a/tests/metrics.rs b/tests/metrics.rs index 546b6d3d22..17d1791138 100644 --- a/tests/metrics.rs +++ b/tests/metrics.rs @@ -69,7 +69,7 @@ async fn metrics_server() { // game_client let local_addr = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, client_port)); tracing::info!(address = %local_addr, "Sending hello"); - socket.send_to(b"hello", &local_addr).await.unwrap(); + socket.send_to(&b"hello"[..], local_addr).await.unwrap(); tokio::time::timeout(std::time::Duration::from_millis(100), recv_chan.recv()) .await diff --git a/tests/qcmp.rs b/tests/qcmp.rs index f38ecf0f9c..f95231cd63 100644 --- a/tests/qcmp.rs +++ b/tests/qcmp.rs @@ -18,48 +18,22 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use tokio::time::Duration; -use quilkin::{ - codec::qcmp::Protocol, - test::{AddressType, TestHelper}, -}; +use quilkin::{codec::qcmp::Protocol, test::TestHelper}; #[tokio::test] #[cfg_attr(target_os = "macos", ignore)] async fn proxy_ping() { let mut t = TestHelper::default(); - let qcmp = quilkin::net::raw_socket_with_reuse(0).unwrap(); - let qcmp_port = quilkin::net::socket_port(&qcmp); - let server_proxy = quilkin::components::proxy::Proxy { - qcmp, - to: vec![(Ipv4Addr::UNSPECIFIED, 0).into()], - ..<_>::default() - }; + let qcmp_port = quilkin::test::available_port(); + let server_proxy = quilkin::cli::Service::default() + .qcmp() + .qcmp_port(qcmp_port) + .udp_poll(); let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); t.run_server(server_config, Some(server_proxy), None).await; ping(qcmp_port).await; } -#[tokio::test] -#[cfg_attr(target_os = "macos", ignore)] -async fn agent_ping() { - let qcmp_port = quilkin::test::available_addr(AddressType::Random) - .await - .port(); - let agent = quilkin::cli::Agent { - qcmp_port, - ..<_>::default() - }; - let server_config = std::sync::Arc::new(quilkin::Config::default_agent()); - let (_tx, rx) = quilkin::signal::channel(quilkin::signal::ShutdownKind::Testing); - tokio::spawn(async move { - agent - .run(None, server_config, Default::default(), rx) - .await - .expect("Agent should run"); - }); - ping(qcmp_port).await; -} - async fn ping(port: u16) { tokio::time::sleep(std::time::Duration::from_millis(250)).await; let socket = tokio::net::UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0))