From f92738f4a785eae7ac7d2da444966a3a7232d487 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 13 Nov 2024 22:00:15 +0100 Subject: [PATCH] ref(server): make idle time of a http connection configurable --- CHANGELOG.md | 1 + relay-config/src/config.rs | 15 +- relay-server/src/services/server/acceptor.rs | 87 +++++--- relay-server/src/services/server/io.rs | 213 ++++++++++++++++--- relay-server/src/services/server/mod.rs | 9 +- 5 files changed, 256 insertions(+), 69 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff27087221..cc02f4d1c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ - Allow `sample_rate` to be float type when deserializing `DynamicSamplingContext`. ([#4181](https://github.com/getsentry/relay/pull/4181)) - Support inbound filters for profiles. ([#4176](https://github.com/getsentry/relay/pull/4176)) - Scrub lower-case redis commands. ([#4235](https://github.com/getsentry/relay/pull/4235)) +- Makes the maximum idle time of a HTTP connection configurable. ([#4248](https://github.com/getsentry/relay/pull/4248)) **Internal**: diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 9370698e6f..5025831d6e 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -633,10 +633,17 @@ struct Limits { /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown /// signal. shutdown_timeout: u64, - /// server keep-alive timeout in seconds. + /// Server keep-alive timeout in seconds. /// /// By default keep-alive is set to a 5 seconds. keepalive_timeout: u64, + /// Server idle timeout in seconds. + /// + /// The idle timeout limits the amount of time a connection is kept open without activity. + /// Setting this too short may abort connections before Relay is able to send a response. + /// + /// By default there is no idle timeout. + idle_timeout: Option, /// The TCP listen backlog. /// /// Configures the TCP listen backlog for the listening socket of Relay. @@ -673,6 +680,7 @@ impl Default for Limits { query_timeout: 30, shutdown_timeout: 10, keepalive_timeout: 5, + idle_timeout: None, tcp_listen_backlog: 1024, } } @@ -2319,6 +2327,11 @@ impl Config { Duration::from_secs(self.values.limits.keepalive_timeout) } + /// Returns the server idle timeout in seconds. + pub fn idle_timeout(&self) -> Option { + self.values.limits.idle_timeout.map(Duration::from_secs) + } + /// TCP listen backlog to configure on Relay's listening socket. pub fn tcp_listen_backlog(&self) -> u32 { self.values.limits.tcp_listen_backlog diff --git a/relay-server/src/services/server/acceptor.rs b/relay-server/src/services/server/acceptor.rs index 2d061c044d..eb684a3c6f 100644 --- a/relay-server/src/services/server/acceptor.rs +++ b/relay-server/src/services/server/acceptor.rs @@ -1,7 +1,7 @@ use std::io; +use std::time::Duration; use axum_server::accept::Accept; -use relay_config::Config; use socket2::TcpKeepalive; use tokio::net::TcpStream; @@ -9,17 +9,63 @@ use crate::services::server::io::IdleTimeout; use crate::statsd::RelayCounters; #[derive(Clone, Debug, Default)] -pub struct RelayAcceptor(Option); +pub struct RelayAcceptor { + tcp_keepalive: Option, + idle_timeout: Option, +} impl RelayAcceptor { - /// Create a new acceptor that sets `TCP_NODELAY` and keep-alive. - pub fn new(config: &Config, keepalive_retries: u32) -> Self { - Self(build_keepalive(config, keepalive_retries)) + /// Creates a new [`RelayAcceptor`] which only configures `TCP_NODELAY`. + pub fn new() -> Self { + Default::default() + } + + /// Configures the acceptor to enable TCP keep-alive. + /// + /// The `timeout` is used to configure the keep-alive time as well as interval. + /// A zero duration timeout disables TCP keep-alive. + /// + /// `retries` configures the amount of keep-alive probes. + pub fn tcp_keepalive(mut self, timeout: Duration, retries: u32) -> Self { + if timeout.is_zero() { + self.tcp_keepalive = None; + return self; + } + + let mut ka = socket2::TcpKeepalive::new().with_time(timeout); + #[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))] + { + ka = ka.with_interval(timeout); + } + #[cfg(not(any( + target_os = "openbsd", + target_os = "redox", + target_os = "solaris", + target_os = "windows" + )))] + { + ka = ka.with_retries(retries); + } + self.tcp_keepalive = Some(ka); + + self + } + + /// Configures an idle timeout for the connection. + /// + /// Whenever there is no activity on a connection for the specified timeout, + /// the connection is closed. + /// + /// Note: This limits the total idle time of a duration and unlike read and write timeouts + /// also limits the time a connection is kept alive without requests. + pub fn idle_timeout(mut self, idle_timeout: Option) -> Self { + self.idle_timeout = idle_timeout; + self } } impl Accept for RelayAcceptor { - type Stream = std::pin::Pin>>; + type Stream = IdleTimeout; type Service = S; type Future = std::future::Ready>; @@ -27,7 +73,7 @@ impl Accept for RelayAcceptor { let mut keepalive = "ok"; let mut nodelay = "ok"; - if let Self(Some(ref tcp_keepalive)) = self { + if let Some(tcp_keepalive) = &self.tcp_keepalive { let sock_ref = socket2::SockRef::from(&stream); if let Err(e) = sock_ref.set_tcp_keepalive(tcp_keepalive) { relay_log::trace!("error trying to set TCP keepalive: {e}"); @@ -46,32 +92,7 @@ impl Accept for RelayAcceptor { nodelay = nodelay ); - let stream = Box::pin(IdleTimeout::new(stream, std::time::Duration::from_secs(5))); + let stream = IdleTimeout::new(stream, self.idle_timeout); std::future::ready(Ok((stream, service))) } } - -fn build_keepalive(config: &Config, keepalive_retries: u32) -> Option { - let ka_timeout = config.keepalive_timeout(); - if ka_timeout.is_zero() { - return None; - } - - let mut ka = TcpKeepalive::new().with_time(ka_timeout); - #[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))] - { - ka = ka.with_interval(ka_timeout); - } - - #[cfg(not(any( - target_os = "openbsd", - target_os = "redox", - target_os = "solaris", - target_os = "windows" - )))] - { - ka = ka.with_retries(keepalive_retries); - } - - Some(ka) -} diff --git a/relay-server/src/services/server/io.rs b/relay-server/src/services/server/io.rs index e689908a8c..aa46f6f945 100644 --- a/relay-server/src/services/server/io.rs +++ b/relay-server/src/services/server/io.rs @@ -1,55 +1,75 @@ -use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; +use futures::FutureExt; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio::time::Sleep; +use tokio::time::{Instant, Sleep}; use crate::statsd::RelayCounters; -pin_project_lite::pin_project! { - pub struct IdleTimeout { - #[pin] - inner: T, - timeout: Duration, - #[pin] - sleep: Option, - } +/// A wrapper for [`AsyncRead`] and [`AsyncWrite`] streams with a maximum idle time. +/// +/// If there is no activity in the underlying stream for the specified `timeout` +/// the [`IdleTimeout`] will abort the operation and return [`std::error::ErrorKind::TimedOut`]. +pub struct IdleTimeout { + inner: T, + timeout: Option, + // `Box::pin` the sleep timer, the entire connection/stream is required to be `Unpin` anyways. + timer: Option>>, + is_idle: bool, } -impl IdleTimeout { - pub fn new(inner: T, timeout: Duration) -> Self { +impl IdleTimeout { + /// Creates a new [`IdleTimeout`] with the specified timeout. + /// + /// A `None` timeout is equivalent to an infinite timeout. + pub fn new(inner: T, timeout: Option) -> Self { Self { inner, timeout, - sleep: None, + timer: None, + is_idle: false, } } fn wrap_poll( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, poll_fn: F, ) -> Poll> where F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll>, { - let mut this = self.project(); - match poll_fn(this.inner, cx) { + match poll_fn(Pin::new(&mut self.inner), cx) { Poll::Ready(ret) => { // Any activity on the stream resets the timeout. - this.sleep.set(None); + self.is_idle = false; Poll::Ready(ret) } Poll::Pending => { - // No activity on the stream, start the idle timeout. - if this.sleep.is_none() { - this.sleep.set(Some(tokio::time::sleep(*this.timeout))); - } + // No timeout configured -> nothing to do. + let Some(timeout) = self.timeout else { + return Poll::Pending; + }; + + let was_idle = self.is_idle; + self.is_idle = true; + + let timer = match &mut self.timer { + // No timer created and we're idle now, create a timer with the appropriate deadline. + entry @ None => entry.insert(Box::pin(tokio::time::sleep(timeout))), + Some(sleep) => { + // Only if we were not idle, we have to reset the schedule. + if !was_idle { + let deadline = Instant::now() + timeout; + sleep.as_mut().reset(deadline); + } + sleep + } + }; - let sleep = this.sleep.as_pin_mut().expect("sleep timer was just set"); - match sleep.poll(cx) { + match timer.poll_unpin(cx) { Poll::Ready(_) => { relay_log::trace!("closing idle server connection"); relay_statsd::metric!( @@ -64,10 +84,7 @@ impl IdleTimeout { } } -impl AsyncRead for IdleTimeout -where - T: AsyncRead, -{ +impl AsyncRead for IdleTimeout { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -77,10 +94,7 @@ where } } -impl AsyncWrite for IdleTimeout -where - T: AsyncWrite, -{ +impl AsyncWrite for IdleTimeout { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -112,3 +126,140 @@ where self.inner.is_write_vectored() } } + +#[cfg(test)] +mod tests { + use std::io::ErrorKind; + + use tokio::io::{AsyncReadExt, AsyncWriteExt, SimplexStream}; + + use super::*; + + macro_rules! assert_timeout { + ($duration:expr, $future:expr) => { + if let Ok(r) = tokio::time::timeout($duration, $future).await { + assert!( + false, + "expected {} to fail, but it returned {:?} in time", + stringify!($future), + r + ) + } + }; + } + + #[tokio::test(start_paused = true)] + async fn test_read() { + let (receiver, mut sender) = tokio::io::simplex(64); + let mut receiver = IdleTimeout::new(receiver, Some(Duration::from_secs(3))); + + assert_timeout!(Duration::from_millis(2900), receiver.read_u8()); + assert_timeout!(Duration::from_millis(70), receiver.read_u8()); + assert_timeout!(Duration::from_millis(29), receiver.read_u8()); + + sender.write_u8(1).await.unwrap(); + assert_eq!(receiver.read_u8().await.unwrap(), 1); + + // Timeout must be reset after reading. + assert_timeout!(Duration::from_millis(2900), receiver.read_u8()); + assert_timeout!(Duration::from_millis(70), receiver.read_u8()); + assert_timeout!(Duration::from_millis(29), receiver.read_u8()); + + // Only now it should fail. + assert_eq!( + receiver.read_u8().await.unwrap_err().kind(), + ErrorKind::TimedOut + ); + } + + #[tokio::test(start_paused = true)] + async fn test_read_no_idle_time() { + let (receiver, _sender) = tokio::io::simplex(64); + let mut receiver = IdleTimeout::new(receiver, None); + + // A year should be enough... + assert_timeout!(Duration::from_secs(365 * 24 * 3600), receiver.read_u8()); + } + + #[tokio::test(start_paused = true)] + async fn test_write() { + let (mut receiver, sender) = tokio::io::simplex(1); + let mut sender = IdleTimeout::new(sender, Some(Duration::from_secs(3))); + + // First byte can immediately write. + sender.write_u8(1).await.unwrap(); + // Second write, is blocked on the 1 byte sized buffer. + assert_timeout!(Duration::from_millis(2900), sender.write_u8(2)); + assert_timeout!(Duration::from_millis(70), sender.write_u8(2)); + assert_timeout!(Duration::from_millis(29), sender.write_u8(2)); + + // Consume the blocking byte and write successfully. + assert_eq!(receiver.read_u8().await.unwrap(), 1); + sender.write_u8(2).await.unwrap(); + + // Timeout must be reset. + assert_timeout!(Duration::from_millis(2900), sender.write_u8(3)); + assert_timeout!(Duration::from_millis(70), sender.write_u8(3)); + assert_timeout!(Duration::from_millis(29), sender.write_u8(3)); + + // Only now it should fail. + assert_eq!( + sender.write_u8(3).await.unwrap_err().kind(), + ErrorKind::TimedOut + ); + } + + #[tokio::test(start_paused = true)] + async fn test_write_no_timeout() { + let (_receiver, sender) = tokio::io::simplex(64); + let mut sender = IdleTimeout::new(sender, None); + + sender.write_u8(1).await.unwrap(); + // A year should be enough... + assert_timeout!(Duration::from_secs(365 * 24 * 3600), sender.write_u8(2)); + } + + #[tokio::test(start_paused = true)] + async fn test_read_write() { + let stream = SimplexStream::new_unsplit(1); + let mut stream = IdleTimeout::new(stream, Some(Duration::from_secs(3))); + + // First byte can immediately write. + stream.write_u8(1).await.unwrap(); + // And read. + assert_eq!(stream.read_u8().await.unwrap(), 1); + + // The buffer is empty, but we should not time out. + assert_timeout!(Duration::from_millis(2900), stream.read_u8()); + assert_timeout!(Duration::from_millis(70), stream.read_u8()); + assert_timeout!(Duration::from_millis(29), stream.read_u8()); + + // A write resets the read timer. + stream.write_u8(2).await.unwrap(); + tokio::time::advance(Duration::from_millis(2900)).await; + assert_eq!(stream.read_u8().await.unwrap(), 2); + + // Same for writes. + stream.write_u8(3).await.unwrap(); + assert_timeout!(Duration::from_millis(2900), stream.write_u8(3)); + assert_timeout!(Duration::from_millis(70), stream.write_u8(3)); + assert_timeout!(Duration::from_millis(29), stream.write_u8(3)); + + assert_eq!(stream.read_u8().await.unwrap(), 3); + tokio::time::advance(Duration::from_millis(2900)).await; + stream.write_u8(99).await.unwrap(); + + // Buffer is full and no one is clearing it, this should fail. + assert_eq!( + stream.write_u8(0).await.unwrap_err().kind(), + ErrorKind::TimedOut + ); + + // Make sure reads are also timing out. + assert_eq!(stream.read_u8().await.unwrap(), 99); + assert_eq!( + stream.read_u8().await.unwrap_err().kind(), + ErrorKind::TimedOut + ); + } +} diff --git a/relay-server/src/services/server/mod.rs b/relay-server/src/services/server/mod.rs index 264b040047..d2661c1e98 100644 --- a/relay-server/src/services/server/mod.rs +++ b/relay-server/src/services/server/mod.rs @@ -115,11 +115,12 @@ fn listen(config: &Config) -> Result { fn serve(listener: TcpListener, app: App, config: Arc) { let handle = Handle::new(); + let acceptor = self::acceptor::RelayAcceptor::new() + .tcp_keepalive(config.keepalive_timeout(), KEEPALIVE_RETRIES) + .idle_timeout(config.idle_timeout()); + let mut server = axum_server::from_tcp(listener) - .acceptor(self::acceptor::RelayAcceptor::new( - &config, - KEEPALIVE_RETRIES, - )) + .acceptor(acceptor) .handle(handle.clone()); server