Skip to content

Commit 60b3cc2

Browse files
committed
refactor(io): Refactor network types into net::io modules
1 parent beb9bc1 commit 60b3cc2

File tree

21 files changed

+609
-591
lines changed

21 files changed

+609
-591
lines changed

src/cli/service.rs

Lines changed: 20 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{future::Future, sync::Arc};
22

3-
use crate::{components::proxy::SessionPool, config::Config};
3+
use crate::config::Config;
44

55
#[derive(Debug, clap::Parser)]
66
#[command(next_help_heading = "Service Options")]
@@ -249,19 +249,23 @@ impl Service {
249249
let mut shutdown_rx = shutdown_rx.clone();
250250
let mds_task = self.publish_mds(config)?;
251251
let (phoenix_task, phoenix_finalizer) = self.publish_phoenix(config)?;
252-
// We need to call this before qcmp since if we use XDP we handle QCMP
253-
// internally without a separate task
254-
let (udp_task, finalizer, session_pool) = self.publish_udp(config)?;
255-
let qcmp_task = self.publish_qcmp(&shutdown_rx)?;
252+
let (udp_task, finalizer, session_pool) = self.listen_udp(config, &shutdown_rx)?;
256253
let xds_task = self.publish_xds(config)?;
257254

255+
tracing::info!(services=?[
256+
self.udp_enabled.then_some("udp"),
257+
self.qcmp_enabled.then_some("qcmp"),
258+
self.phoenix_enabled.then_some("phoenix"),
259+
self.xds_enabled.then_some("xds"),
260+
self.mds_enabled.then_some("mds"),
261+
].into_iter().flatten().collect::<Vec<&str>>(), "starting service listeners");
262+
258263
Ok(tokio::spawn(async move {
259264
tokio::task::spawn(async move {
260265
let (task, result) = tokio::select! {
261266
result = mds_task => ("mds", result),
262267
result = phoenix_task => ("phoenix", result),
263-
result = qcmp_task => ("qcmp", result),
264-
result = udp_task => ("udp", result),
268+
result = udp_task => ("udp/qcmp", result),
265269
result = xds_task => ("xds", result),
266270
};
267271

@@ -341,20 +345,6 @@ impl Service {
341345
Ok((std::future::pending(), None))
342346
}
343347

344-
/// Spawns an QCMP server if enabled, otherwise returns a future which never completes.
345-
fn publish_qcmp(
346-
&self,
347-
shutdown_rx: &crate::signal::ShutdownRx,
348-
) -> crate::Result<impl Future<Output = crate::Result<()>> + use<>> {
349-
if self.qcmp_enabled {
350-
tracing::info!(port=%self.qcmp_port, "starting qcmp service");
351-
let qcmp = crate::net::raw_socket_with_reuse(self.qcmp_port)?;
352-
crate::codec::qcmp::spawn(qcmp, shutdown_rx.clone())?;
353-
}
354-
355-
Ok(std::future::pending())
356-
}
357-
358348
/// Spawns an xDS server if enabled, otherwise returns a future which never completes.
359349
fn publish_mds(
360350
&self,
@@ -409,9 +399,10 @@ impl Service {
409399
}
410400

411401
#[allow(clippy::type_complexity)]
412-
pub fn publish_udp(
402+
pub fn listen_udp(
413403
&mut self,
414404
config: &Arc<Config>,
405+
shutdown_rx: &crate::signal::ShutdownRx,
415406
) -> eyre::Result<(
416407
impl Future<Output = crate::Result<()>> + use<>,
417408
Option<Finalizer>,
@@ -421,166 +412,20 @@ impl Service {
421412
return Ok((either::Left(std::future::pending()), None, None));
422413
}
423414

424-
tracing::info!(port=%self.udp_port, "starting udp service");
425-
426-
#[cfg(target_os = "linux")]
427-
{
428-
match self.spawn_xdp(config.clone(), self.xdp.force_xdp) {
429-
Ok(xdp) => {
430-
if let Some(xdp) = xdp {
431-
self.qcmp_enabled = false;
432-
return Ok((either::Left(std::future::pending()), Some(xdp), None));
433-
} else if self.xdp.force_xdp {
434-
eyre::bail!("XDP was forced on, but failed to initialize");
435-
}
436-
}
437-
Err(err) => {
438-
if self.xdp.force_xdp {
439-
return Err(err);
440-
}
441-
442-
tracing::warn!(
443-
?err,
444-
"failed to spawn XDP I/O loop, falling back to io-uring"
445-
);
446-
}
447-
}
448-
}
449-
450415
if !self.udp_enabled {
451416
return Ok((either::Left(std::future::pending()), None, None));
452417
}
453418

454-
self.spawn_user_space_router(config.clone())
455-
.map(|(fut, func, sp)| (either::Right(fut), Some(func), Some(sp)))
456-
}
457-
458-
/// Launches the user space implementation of the packet router using
459-
/// sockets. This implementation uses a pool of buffers and sockets to
460-
/// manage UDP sessions and sockets. On Linux this will use io-uring, where
461-
/// as it will use epoll interfaces on non-Linux platforms.
462-
#[allow(clippy::type_complexity)]
463-
pub fn spawn_user_space_router(
464-
&self,
465-
config: Arc<Config>,
466-
) -> crate::Result<(
467-
impl Future<Output = crate::Result<()>> + use<>,
468-
Finalizer,
469-
Arc<crate::components::proxy::SessionPool>,
470-
)> {
471-
// If we're on linux, we're using io-uring, but we're probably running in a container
472-
// and may not be allowed to call io-uring related syscalls due to seccomp
473-
// profiles, so do a quick check here to validate that we can call io_uring_setup
474-
// https://www.man7.org/linux/man-pages/man2/io_uring_setup.2.html
475-
#[cfg(target_os = "linux")]
476-
{
477-
if let Err(err) = io_uring::IoUring::new(2) {
478-
fn in_container() -> bool {
479-
let sched = match std::fs::read_to_string("/proc/1/sched") {
480-
Ok(s) => s,
481-
Err(error) => {
482-
tracing::warn!(
483-
%error,
484-
"unable to read /proc/1/sched to determine if quilkin is in a container"
485-
);
486-
return false;
487-
}
488-
};
489-
let Some(line) = sched.lines().next() else {
490-
tracing::warn!("/proc/1/sched was empty");
491-
return false;
492-
};
493-
let Some(proc) = line.split(' ').next() else {
494-
tracing::warn!("first line of /proc/1/sched was empty");
495-
return false;
496-
};
497-
proc != "init" && proc != "systemd"
498-
}
499-
500-
if err.kind() == std::io::ErrorKind::PermissionDenied && in_container() {
501-
eyre::bail!(
502-
"failed to call `io_uring_setup` due to EPERM ({err}), quilkin seems to be running inside a container meaning this is likely due to the seccomp profile not allowing the syscall"
503-
);
504-
} else {
505-
eyre::bail!("failed to call `io_uring_setup` due to {err}");
506-
}
507-
}
508-
}
509-
510-
let socket = crate::net::raw_socket_with_reuse(self.udp_port)?;
511-
let workers = self.udp_workers.get();
512-
let buffer_pool = Arc::new(crate::collections::BufferPool::new(workers, 2 * 1024));
513-
514-
let mut worker_sends = Vec::with_capacity(workers);
515-
let mut session_sends = Vec::with_capacity(workers);
516-
for _ in 0..workers {
517-
let queue = crate::net::queue(15)?;
518-
session_sends.push(queue.0.clone());
519-
worker_sends.push(queue);
520-
}
521-
522-
let sessions = SessionPool::new(config.clone(), session_sends, buffer_pool.clone());
523-
524-
crate::components::proxy::packet_router::spawn_receivers(
419+
let (fut, finaliser, sessions) = crate::net::io::listen(
525420
config,
526-
socket,
527-
worker_sends,
528-
&sessions,
529-
buffer_pool,
421+
self.udp_enabled.then_some(self.udp_port),
422+
self.qcmp_enabled.then_some(self.qcmp_port),
423+
self.udp_workers.get(),
424+
self.xdp.clone(),
425+
shutdown_rx,
530426
)?;
531427

532-
Ok((
533-
std::future::pending(),
534-
Box::from(move |_shutdown_rx: &crate::signal::ShutdownRx| {}),
535-
sessions,
536-
))
537-
}
538-
539-
#[cfg(target_os = "linux")]
540-
fn spawn_xdp(&self, config: Arc<Config>, force_xdp: bool) -> eyre::Result<Option<Finalizer>> {
541-
use crate::net::xdp;
542-
use eyre::{Context as _, ContextCompat as _};
543-
544-
// TODO: remove this once it's been more stabilized
545-
if !force_xdp {
546-
return Ok(None);
547-
}
548-
549-
let filters = config
550-
.dyn_cfg
551-
.filters()
552-
.context("XDP requires a filter chain")?
553-
.clone();
554-
let clusters = config
555-
.dyn_cfg
556-
.clusters()
557-
.context("XDP requires a cluster map")?
558-
.clone();
559-
560-
let config = crate::net::xdp::process::ConfigState { filters, clusters };
561-
562-
let udp_port = if self.udp_enabled { self.udp_port } else { 0 };
563-
let qcmp_port = if self.qcmp_enabled { self.qcmp_port } else { 0 };
564-
565-
tracing::info!(udp_port, qcmp_port, "setting up xdp module");
566-
let workers = xdp::setup_xdp_io(xdp::XdpConfig {
567-
nic: self
568-
.xdp
569-
.network_interface
570-
.as_deref()
571-
.map_or(xdp::NicConfig::Default, xdp::NicConfig::Name),
572-
external_port: udp_port,
573-
qcmp_port,
574-
maximum_packet_memory: self.xdp.maximum_memory,
575-
require_zero_copy: self.xdp.force_zerocopy,
576-
require_tx_checksum: self.xdp.force_tx_checksum_offload,
577-
})
578-
.context("failed to setup XDP")?;
579-
580-
let io_loop = xdp::spawn(workers, config).context("failed to spawn XDP I/O loop")?;
581-
Ok(Some(Box::new(move |srx: &crate::signal::ShutdownRx| {
582-
io_loop.shutdown(*srx.borrow() == crate::signal::ShutdownKind::Normal);
583-
})))
428+
Ok((either::Right(fut), finaliser, sessions))
584429
}
585430
}
586431

src/components/proxy.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
pub(crate) mod error;
18-
pub mod packet_router;
19-
pub(crate) mod sessions;
17+
use crate::net::{error, udp::sessions};
2018

2119
use super::RunArgs;
2220
pub use error::PipelineError;

0 commit comments

Comments
 (0)