Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quinn-udp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ default = ["tracing", "tracing-log"]
# Configure `tracing` to log events via `log` if no `tracing` subscriber exists.
tracing-log = ["tracing/log"]
log = ["dep:log"]
# Use private Apple APIs to send multiple packets in a single syscall.
# Support private Apple APIs to send multiple packets in a single syscall.
fast-apple-datapath = []

[dependencies]
Expand Down
56 changes: 56 additions & 0 deletions quinn-udp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,62 @@ impl EcnCodepoint {
}
}

/// Enables Apple's fast UDP datapath on a [`UdpSocketState`].
///
/// This C ABI function allows FFI callers to enable the fast datapath using
/// private `sendmsg_x`/`recvmsg_x` APIs. These APIs may crash on unsupported
/// OS versions, so callers must verify availability before enabling.
///
/// # Safety
///
/// The `state` pointer must be valid and point to an initialized `UdpSocketState`.
///
/// # Example
///
/// ```c
/// // In C/C++ code:
/// extern "C" void quinn_udp_socket_state_enable_apple_fast_path(void* state);
///
/// // After probing for fast datapath availability:
/// if (apple_fast_datapath_available) {
/// quinn_udp_socket_state_enable_apple_fast_path(socket_state);
/// }
/// ```
#[cfg(apple_fast)]
#[no_mangle]
pub unsafe extern "C" fn quinn_udp_socket_state_enable_apple_fast_path(state: *const UdpSocketState) {
if let Some(state) = state.as_ref() {
state.enable_apple_fast_path();
}
}

/// Returns whether Apple's fast UDP datapath is enabled on a [`UdpSocketState`].
///
/// # Safety
///
/// The `state` pointer must be valid and point to an initialized `UdpSocketState`.
///
/// # Example
///
/// ```c
/// // In C/C++ code:
/// extern "C" bool quinn_udp_socket_state_is_apple_fast_path_enabled(void* state);
///
/// if (quinn_udp_socket_state_is_apple_fast_path_enabled(socket_state)) {
/// // Fast datapath is enabled
/// }
/// ```
#[cfg(apple_fast)]
#[no_mangle]
pub unsafe extern "C" fn quinn_udp_socket_state_is_apple_fast_path_enabled(
state: *const UdpSocketState,
) -> bool {
state
.as_ref()
.map(|s| s.is_apple_fast_path_enabled())
.unwrap_or(false)
}

#[cfg(test)]
mod tests {
use std::net::Ipv4Addr;
Expand Down
184 changes: 164 additions & 20 deletions quinn-udp/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ pub struct UdpSocketState {
/// In particular, we do not use IP_TOS cmsg_type in this case,
/// which is not supported on Linux <3.13 and results in not sending the UDP packet at all.
sendmsg_einval: AtomicBool,

/// Whether to use Apple's fast `sendmsg_x`/`recvmsg_x` APIs.
///
/// These private APIs provide better performance but may not be available on all
/// Apple OS versions. Callers must verify availability before enabling.
#[cfg(apple_fast)]
apple_fast_path: AtomicBool,
}

impl UdpSocketState {
Expand Down Expand Up @@ -191,6 +198,8 @@ impl UdpSocketState {
gro_segments: gro::gro_segments(),
may_fragment,
sendmsg_einval: AtomicBool::new(false),
#[cfg(apple_fast)]
apple_fast_path: AtomicBool::new(false),
})
}

Expand Down Expand Up @@ -231,7 +240,7 @@ impl UdpSocketState {
bufs: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> io::Result<usize> {
recv(socket.0, bufs, meta)
recv(self, socket.0, bufs, meta)
}

/// The maximum amount of segments which can be transmitted if a platform
Expand Down Expand Up @@ -295,6 +304,25 @@ impl UdpSocketState {
fn set_sendmsg_einval(&self) {
self.sendmsg_einval.store(true, Ordering::Relaxed)
}

/// Enables Apple's fast UDP datapath using private `sendmsg_x`/`recvmsg_x` APIs.
///
/// These APIs may crash on unsupported OS versions, so callers must verify
/// availability before enabling. Once enabled, this also updates [`max_gso_segments`]
/// to allow batched sends.
///
/// [`max_gso_segments`]: Self::max_gso_segments
#[cfg(apple_fast)]
pub fn enable_apple_fast_path(&self) {
self.apple_fast_path.store(true, Ordering::Relaxed);
self.max_gso_segments.store(BATCH_SIZE, Ordering::Relaxed);
}

/// Returns whether Apple's fast UDP datapath is enabled for this socket.
#[cfg(apple_fast)]
pub fn is_apple_fast_path_enabled(&self) -> bool {
self.apple_fast_path.load(Ordering::Relaxed)
}
}

#[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))]
Expand Down Expand Up @@ -384,6 +412,20 @@ fn send(

#[cfg(apple_fast)]
fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
if state.is_apple_fast_path_enabled() {
send_via_sendmsg_x(state, io, transmit)
} else {
send_single(state, io, transmit)
}
}

/// Send using the fast `sendmsg_x` API.
#[cfg(apple_fast)]
fn send_via_sendmsg_x(
state: &UdpSocketState,
io: SockRef<'_>,
transmit: &Transmit<'_>,
) -> io::Result<()> {
let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() };
let mut iovs = unsafe { mem::zeroed::<[libc::iovec; BATCH_SIZE]>() };
let mut ctrls = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE];
Expand All @@ -397,7 +439,7 @@ fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io:
.enumerate()
.take(BATCH_SIZE)
{
prepare_msg(
prepare_msg_x(
&Transmit {
destination: transmit.destination,
ecn: transmit.ecn,
Expand Down Expand Up @@ -433,6 +475,11 @@ fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io:

#[cfg(any(target_os = "openbsd", target_os = "netbsd", apple_slow))]
fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
send_single(state, io, transmit)
}

#[cfg(any(target_os = "openbsd", target_os = "netbsd", apple))]
fn send_single(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> {
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
let mut iov: libc::iovec = unsafe { mem::zeroed() };
let mut ctrl = cmsg::Aligned([0u8; CMSG_LEN]);
Expand Down Expand Up @@ -469,7 +516,12 @@ fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io:
target_os = "dragonfly",
solarish
)))]
fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
fn recv(
_state: &UdpSocketState,
io: SockRef<'_>,
bufs: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> io::Result<usize> {
let mut names = [MaybeUninit::<libc::sockaddr_storage>::uninit(); BATCH_SIZE];
let mut ctrls = [cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit()); BATCH_SIZE];
let mut hdrs = unsafe { mem::zeroed::<[libc::mmsghdr; BATCH_SIZE]>() };
Expand Down Expand Up @@ -511,7 +563,26 @@ fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) ->
}

#[cfg(apple_fast)]
fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
fn recv(
state: &UdpSocketState,
io: SockRef<'_>,
bufs: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> io::Result<usize> {
if state.is_apple_fast_path_enabled() {
recv_via_recvmsg_x(io, bufs, meta)
} else {
recv_single(io, bufs, meta)
}
}

/// Receive using the fast `recvmsg_x` API.
#[cfg(apple_fast)]
fn recv_via_recvmsg_x(
io: SockRef<'_>,
bufs: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> io::Result<usize> {
let mut names = [MaybeUninit::<libc::sockaddr_storage>::uninit(); BATCH_SIZE];
// MacOS 10.15 `recvmsg_x` does not override the `msghdr_x`
// `msg_controllen`. Thus, after the call to `recvmsg_x`, one does not know
Expand All @@ -523,7 +594,7 @@ fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) ->
let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() };
let max_msg_count = bufs.len().min(BATCH_SIZE);
for i in 0..max_msg_count {
prepare_recv(&mut bufs[i], &mut names[i], &mut ctrls[i], &mut hdrs[i]);
prepare_recv_x(&mut bufs[i], &mut names[i], &mut ctrls[i], &mut hdrs[i]);
}
let msg_count = loop {
let n = unsafe { recvmsg_x(io.as_raw_fd(), hdrs.as_mut_ptr(), max_msg_count as _, 0) };
Expand Down Expand Up @@ -552,7 +623,27 @@ fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) ->
solarish,
apple_slow
))]
fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result<usize> {
fn recv(
_state: &UdpSocketState,
io: SockRef<'_>,
bufs: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> io::Result<usize> {
recv_single(io, bufs, meta)
}

#[cfg(any(
target_os = "openbsd",
target_os = "netbsd",
target_os = "dragonfly",
solarish,
apple
))]
fn recv_single(
io: SockRef<'_>,
bufs: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> io::Result<usize> {
let mut name = MaybeUninit::<libc::sockaddr_storage>::uninit();
let mut ctrl = cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit());
let mut hdr = unsafe { mem::zeroed::<libc::msghdr>() };
Expand Down Expand Up @@ -584,8 +675,7 @@ const CMSG_LEN: usize = 88;
fn prepare_msg(
transmit: &Transmit<'_>,
dst_addr: &socket2::SockAddr,
#[cfg(not(apple_fast))] hdr: &mut libc::msghdr,
#[cfg(apple_fast)] hdr: &mut msghdr_x,
hdr: &mut libc::msghdr,
iov: &mut libc::iovec,
ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
#[allow(unused_variables)] // only used on FreeBSD & macOS
Expand Down Expand Up @@ -668,7 +758,66 @@ fn prepare_msg(
encoder.finish();
}

#[cfg(not(apple_fast))]
/// Prepares an `msghdr_x` for use with `sendmsg_x`.
#[cfg(apple_fast)]
fn prepare_msg_x(
transmit: &Transmit<'_>,
dst_addr: &socket2::SockAddr,
hdr: &mut msghdr_x,
iov: &mut libc::iovec,
ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
#[allow(unused_variables)] encode_src_ip: bool,
sendmsg_einval: bool,
) {
iov.iov_base = transmit.contents.as_ptr() as *const _ as *mut _;
iov.iov_len = transmit.contents.len();

let name = dst_addr.as_ptr() as *mut libc::c_void;
let namelen = dst_addr.len();
hdr.msg_name = name as *mut _;
hdr.msg_namelen = namelen;
hdr.msg_iov = iov;
hdr.msg_iovlen = 1;

hdr.msg_control = ctrl.0.as_mut_ptr() as _;
hdr.msg_controllen = CMSG_LEN as _;
let mut encoder = unsafe { cmsg::Encoder::new(hdr) };
let ecn = transmit.ecn.map_or(0, |x| x as libc::c_int);
let is_ipv4 = transmit.destination.is_ipv4()
|| matches!(transmit.destination.ip(), IpAddr::V6(addr) if addr.to_ipv4_mapped().is_some());
if is_ipv4 {
if !sendmsg_einval {
encoder.push(libc::IPPROTO_IP, libc::IP_TOS, ecn as IpTosTy);
}
} else {
encoder.push(libc::IPPROTO_IPV6, libc::IPV6_TCLASS, ecn);
}

if let Some(ip) = &transmit.src_ip {
match ip {
IpAddr::V4(v4) => {
if encode_src_ip {
let addr = libc::in_addr {
s_addr: u32::from_ne_bytes(v4.octets()),
};
encoder.push(libc::IPPROTO_IP, libc::IP_RECVDSTADDR, addr);
}
}
IpAddr::V6(v6) => {
let pktinfo = libc::in6_pktinfo {
ipi6_ifindex: 0,
ipi6_addr: libc::in6_addr {
s6_addr: v6.octets(),
},
};
encoder.push(libc::IPPROTO_IPV6, libc::IPV6_PKTINFO, pktinfo);
}
}
}

encoder.finish();
}

fn prepare_recv(
buf: &mut IoSliceMut<'_>,
name: &mut MaybeUninit<libc::sockaddr_storage>,
Expand All @@ -684,8 +833,9 @@ fn prepare_recv(
hdr.msg_flags = 0;
}

/// Prepares an `msghdr_x` for receiving with `recvmsg_x`.
#[cfg(apple_fast)]
fn prepare_recv(
fn prepare_recv_x(
buf: &mut IoSliceMut<'_>,
name: &mut MaybeUninit<libc::sockaddr_storage>,
ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>,
Expand Down Expand Up @@ -991,24 +1141,18 @@ mod gso {
// On Apple platforms using the `sendmsg_x` call, UDP datagram segmentation is not
// offloaded to the NIC or even the kernel, but instead done here in user space in
// [`send`]) and then passed to the OS as individual `iovec`s (up to `BATCH_SIZE`).
// The initial value is 1 (no batching); callers can enable batching via
// `UdpSocketState::enable_apple_fast_path()` which updates `max_gso_segments`.
#[cfg(not(any(target_os = "linux", target_os = "android")))]
mod gso {
use super::*;

pub(super) fn max_gso_segments() -> usize {
#[cfg(apple_fast)]
{
BATCH_SIZE
}
#[cfg(not(apple_fast))]
{
1
}
1
}

pub(super) fn set_segment_size(
#[cfg(not(apple_fast))] _encoder: &mut cmsg::Encoder<'_, libc::msghdr>,
#[cfg(apple_fast)] _encoder: &mut cmsg::Encoder<'_, msghdr_x>,
_encoder: &mut cmsg::Encoder<'_, libc::msghdr>,
_segment_size: u16,
) {
}
Expand Down
Loading
Loading