From f650706cd2fc5d2a9a9e884b1289576bd91c3737 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 13 Nov 2024 20:07:16 +0100 Subject: [PATCH 1/3] pin projection --- relay-server/src/services/server/acceptor.rs | 77 ++++++++++++ relay-server/src/services/server/io.rs | 114 ++++++++++++++++++ .../src/services/{server.rs => server/mod.rs} | 82 ++----------- relay-server/src/statsd.rs | 3 + 4 files changed, 203 insertions(+), 73 deletions(-) create mode 100644 relay-server/src/services/server/acceptor.rs create mode 100644 relay-server/src/services/server/io.rs rename relay-server/src/services/{server.rs => server/mod.rs} (75%) diff --git a/relay-server/src/services/server/acceptor.rs b/relay-server/src/services/server/acceptor.rs new file mode 100644 index 0000000000..2d061c044d --- /dev/null +++ b/relay-server/src/services/server/acceptor.rs @@ -0,0 +1,77 @@ +use std::io; + +use axum_server::accept::Accept; +use relay_config::Config; +use socket2::TcpKeepalive; +use tokio::net::TcpStream; + +use crate::services::server::io::IdleTimeout; +use crate::statsd::RelayCounters; + +#[derive(Clone, Debug, Default)] +pub struct RelayAcceptor(Option); + +impl RelayAcceptor { + /// Create a new acceptor that sets `TCP_NODELAY` and keep-alive. + pub fn new(config: &Config, keepalive_retries: u32) -> Self { + Self(build_keepalive(config, keepalive_retries)) + } +} + +impl Accept for RelayAcceptor { + type Stream = std::pin::Pin>>; + type Service = S; + type Future = std::future::Ready>; + + fn accept(&self, stream: TcpStream, service: S) -> Self::Future { + let mut keepalive = "ok"; + let mut nodelay = "ok"; + + if let Self(Some(ref tcp_keepalive)) = self { + let sock_ref = socket2::SockRef::from(&stream); + if let Err(e) = sock_ref.set_tcp_keepalive(tcp_keepalive) { + relay_log::trace!("error trying to set TCP keepalive: {e}"); + keepalive = "error"; + } + } + + if let Err(e) = stream.set_nodelay(true) { + relay_log::trace!("failed to set TCP_NODELAY: {e}"); + nodelay = "error"; + } + + relay_statsd::metric!( + counter(RelayCounters::ServerSocketAccept) += 1, + keepalive = keepalive, + nodelay = nodelay + ); + + let stream = Box::pin(IdleTimeout::new(stream, std::time::Duration::from_secs(5))); + std::future::ready(Ok((stream, service))) + } +} + +fn build_keepalive(config: &Config, keepalive_retries: u32) -> Option { + let ka_timeout = config.keepalive_timeout(); + if ka_timeout.is_zero() { + return None; + } + + let mut ka = TcpKeepalive::new().with_time(ka_timeout); + #[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))] + { + ka = ka.with_interval(ka_timeout); + } + + #[cfg(not(any( + target_os = "openbsd", + target_os = "redox", + target_os = "solaris", + target_os = "windows" + )))] + { + ka = ka.with_retries(keepalive_retries); + } + + Some(ka) +} diff --git a/relay-server/src/services/server/io.rs b/relay-server/src/services/server/io.rs new file mode 100644 index 0000000000..e689908a8c --- /dev/null +++ b/relay-server/src/services/server/io.rs @@ -0,0 +1,114 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::time::Sleep; + +use crate::statsd::RelayCounters; + +pin_project_lite::pin_project! { + pub struct IdleTimeout { + #[pin] + inner: T, + timeout: Duration, + #[pin] + sleep: Option, + } +} + +impl IdleTimeout { + pub fn new(inner: T, timeout: Duration) -> Self { + Self { + inner, + timeout, + sleep: None, + } + } + + fn wrap_poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + poll_fn: F, + ) -> Poll> + where + F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll>, + { + let mut this = self.project(); + match poll_fn(this.inner, cx) { + Poll::Ready(ret) => { + // Any activity on the stream resets the timeout. + this.sleep.set(None); + Poll::Ready(ret) + } + Poll::Pending => { + // No activity on the stream, start the idle timeout. + if this.sleep.is_none() { + this.sleep.set(Some(tokio::time::sleep(*this.timeout))); + } + + let sleep = this.sleep.as_pin_mut().expect("sleep timer was just set"); + match sleep.poll(cx) { + Poll::Ready(_) => { + relay_log::trace!("closing idle server connection"); + relay_statsd::metric!( + counter(RelayCounters::ServerConnectionIdleTimeout) += 1 + ); + Poll::Ready(Err(std::io::ErrorKind::TimedOut.into())) + } + Poll::Pending => Poll::Pending, + } + } + } + } +} + +impl AsyncRead for IdleTimeout +where + T: AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + self.wrap_poll(cx, |stream, cx| stream.poll_read(cx, buf)) + } +} + +impl AsyncWrite for IdleTimeout +where + T: AsyncWrite, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.wrap_poll(cx, |stream, cx| stream.poll_write(cx, buf)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.wrap_poll(cx, |stream, cx| stream.poll_flush(cx)) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.wrap_poll(cx, |stream, cx| stream.poll_shutdown(cx)) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + self.wrap_poll(cx, |stream, cx| stream.poll_write_vectored(cx, bufs)) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } +} diff --git a/relay-server/src/services/server.rs b/relay-server/src/services/server/mod.rs similarity index 75% rename from relay-server/src/services/server.rs rename to relay-server/src/services/server/mod.rs index 659485211e..264b040047 100644 --- a/relay-server/src/services/server.rs +++ b/relay-server/src/services/server/mod.rs @@ -1,4 +1,3 @@ -use std::io; use std::net::{SocketAddr, TcpListener}; use std::sync::Arc; use std::time::Duration; @@ -6,13 +5,11 @@ use std::time::Duration; use axum::extract::Request; use axum::http::{header, HeaderName, HeaderValue}; use axum::ServiceExt; -use axum_server::accept::Accept; use axum_server::Handle; use hyper_util::rt::TokioTimer; use relay_config::Config; use relay_system::{Controller, Service, Shutdown}; -use socket2::TcpKeepalive; -use tokio::net::{TcpSocket, TcpStream}; +use tokio::net::TcpSocket; use tower::ServiceBuilder; use tower_http::compression::predicate::SizeAbove; use tower_http::compression::{CompressionLayer, DefaultPredicate, Predicate}; @@ -26,6 +23,9 @@ use crate::middlewares::{ use crate::service::ServiceState; use crate::statsd::{RelayCounters, RelayGauges}; +mod acceptor; +mod io; + /// Set the number of keep-alive retransmissions to be carried out before declaring that remote end /// is not available. const KEEPALIVE_RETRIES: u32 = 5; @@ -47,7 +47,7 @@ const COMPRESSION_MIN_SIZE: u16 = 128; pub enum ServerError { /// Binding failed. #[error("bind to interface failed")] - BindFailed(#[from] io::Error), + BindFailed(#[from] std::io::Error), /// TLS support was not compiled in. #[error("SSL is no longer supported by Relay, please use a proxy in front")] @@ -112,78 +112,14 @@ fn listen(config: &Config) -> Result { Ok(socket.listen(config.tcp_listen_backlog())?.into_std()?) } -fn build_keepalive(config: &Config) -> Option { - let ka_timeout = config.keepalive_timeout(); - if ka_timeout.is_zero() { - return None; - } - - let mut ka = TcpKeepalive::new().with_time(ka_timeout); - #[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))] - { - ka = ka.with_interval(ka_timeout); - } - - #[cfg(not(any( - target_os = "openbsd", - target_os = "redox", - target_os = "solaris", - target_os = "windows" - )))] - { - ka = ka.with_retries(KEEPALIVE_RETRIES); - } - - Some(ka) -} - -#[derive(Clone, Debug, Default)] -pub struct KeepAliveAcceptor(Option); - -impl KeepAliveAcceptor { - /// Create a new acceptor that sets `TCP_NODELAY` and keep-alive. - pub fn new(config: &Config) -> Self { - Self(build_keepalive(config)) - } -} - -impl Accept for KeepAliveAcceptor { - type Stream = TcpStream; - type Service = S; - type Future = std::future::Ready>; - - fn accept(&self, stream: TcpStream, service: S) -> Self::Future { - let mut keepalive = "ok"; - let mut nodelay = "ok"; - - if let Self(Some(ref tcp_keepalive)) = self { - let sock_ref = socket2::SockRef::from(&stream); - if let Err(e) = sock_ref.set_tcp_keepalive(tcp_keepalive) { - relay_log::trace!("error trying to set TCP keepalive: {e}"); - keepalive = "error"; - } - } - - if let Err(e) = stream.set_nodelay(true) { - relay_log::trace!("failed to set TCP_NODELAY: {e}"); - nodelay = "error"; - } - - relay_statsd::metric!( - counter(RelayCounters::ServerSocketAccept) += 1, - keepalive = keepalive, - nodelay = nodelay - ); - - std::future::ready(Ok((stream, service))) - } -} - fn serve(listener: TcpListener, app: App, config: Arc) { let handle = Handle::new(); let mut server = axum_server::from_tcp(listener) - .acceptor(KeepAliveAcceptor::new(&config)) + .acceptor(self::acceptor::RelayAcceptor::new( + &config, + KEEPALIVE_RETRIES, + )) .handle(handle.clone()); server diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 69607b4005..5b5233a9de 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -867,6 +867,8 @@ pub enum RelayCounters { ReplayExceededSegmentLimit, /// Incremented every time the server accepts a new connection. ServerSocketAccept, + /// Incremented every time the server aborts a connection because of an idle timeout. + ServerConnectionIdleTimeout, } impl CounterMetric for RelayCounters { @@ -913,6 +915,7 @@ impl CounterMetric for RelayCounters { RelayCounters::BucketsDropped => "metrics.buckets.dropped", RelayCounters::ReplayExceededSegmentLimit => "replay.segment_limit_exceeded", RelayCounters::ServerSocketAccept => "server.http.accepted", + RelayCounters::ServerConnectionIdleTimeout => "server.http.idle_timeout", } } } From eca7427780b0dd35a9fd3f2d27182f0415641fb1 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 13 Nov 2024 22:00:15 +0100 Subject: [PATCH 2/3] ref(server): make idle time of a http connection configurable --- CHANGELOG.md | 1 + relay-config/src/config.rs | 15 +- relay-server/src/services/server/acceptor.rs | 87 +++++--- relay-server/src/services/server/io.rs | 213 ++++++++++++++++--- relay-server/src/services/server/mod.rs | 9 +- 5 files changed, 256 insertions(+), 69 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff27087221..cc02f4d1c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ - Allow `sample_rate` to be float type when deserializing `DynamicSamplingContext`. ([#4181](https://github.com/getsentry/relay/pull/4181)) - Support inbound filters for profiles. ([#4176](https://github.com/getsentry/relay/pull/4176)) - Scrub lower-case redis commands. ([#4235](https://github.com/getsentry/relay/pull/4235)) +- Makes the maximum idle time of a HTTP connection configurable. ([#4248](https://github.com/getsentry/relay/pull/4248)) **Internal**: diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 9370698e6f..5025831d6e 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -633,10 +633,17 @@ struct Limits { /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown /// signal. shutdown_timeout: u64, - /// server keep-alive timeout in seconds. + /// Server keep-alive timeout in seconds. /// /// By default keep-alive is set to a 5 seconds. keepalive_timeout: u64, + /// Server idle timeout in seconds. + /// + /// The idle timeout limits the amount of time a connection is kept open without activity. + /// Setting this too short may abort connections before Relay is able to send a response. + /// + /// By default there is no idle timeout. + idle_timeout: Option, /// The TCP listen backlog. /// /// Configures the TCP listen backlog for the listening socket of Relay. @@ -673,6 +680,7 @@ impl Default for Limits { query_timeout: 30, shutdown_timeout: 10, keepalive_timeout: 5, + idle_timeout: None, tcp_listen_backlog: 1024, } } @@ -2319,6 +2327,11 @@ impl Config { Duration::from_secs(self.values.limits.keepalive_timeout) } + /// Returns the server idle timeout in seconds. + pub fn idle_timeout(&self) -> Option { + self.values.limits.idle_timeout.map(Duration::from_secs) + } + /// TCP listen backlog to configure on Relay's listening socket. pub fn tcp_listen_backlog(&self) -> u32 { self.values.limits.tcp_listen_backlog diff --git a/relay-server/src/services/server/acceptor.rs b/relay-server/src/services/server/acceptor.rs index 2d061c044d..eb684a3c6f 100644 --- a/relay-server/src/services/server/acceptor.rs +++ b/relay-server/src/services/server/acceptor.rs @@ -1,7 +1,7 @@ use std::io; +use std::time::Duration; use axum_server::accept::Accept; -use relay_config::Config; use socket2::TcpKeepalive; use tokio::net::TcpStream; @@ -9,17 +9,63 @@ use crate::services::server::io::IdleTimeout; use crate::statsd::RelayCounters; #[derive(Clone, Debug, Default)] -pub struct RelayAcceptor(Option); +pub struct RelayAcceptor { + tcp_keepalive: Option, + idle_timeout: Option, +} impl RelayAcceptor { - /// Create a new acceptor that sets `TCP_NODELAY` and keep-alive. - pub fn new(config: &Config, keepalive_retries: u32) -> Self { - Self(build_keepalive(config, keepalive_retries)) + /// Creates a new [`RelayAcceptor`] which only configures `TCP_NODELAY`. + pub fn new() -> Self { + Default::default() + } + + /// Configures the acceptor to enable TCP keep-alive. + /// + /// The `timeout` is used to configure the keep-alive time as well as interval. + /// A zero duration timeout disables TCP keep-alive. + /// + /// `retries` configures the amount of keep-alive probes. + pub fn tcp_keepalive(mut self, timeout: Duration, retries: u32) -> Self { + if timeout.is_zero() { + self.tcp_keepalive = None; + return self; + } + + let mut ka = socket2::TcpKeepalive::new().with_time(timeout); + #[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))] + { + ka = ka.with_interval(timeout); + } + #[cfg(not(any( + target_os = "openbsd", + target_os = "redox", + target_os = "solaris", + target_os = "windows" + )))] + { + ka = ka.with_retries(retries); + } + self.tcp_keepalive = Some(ka); + + self + } + + /// Configures an idle timeout for the connection. + /// + /// Whenever there is no activity on a connection for the specified timeout, + /// the connection is closed. + /// + /// Note: This limits the total idle time of a duration and unlike read and write timeouts + /// also limits the time a connection is kept alive without requests. + pub fn idle_timeout(mut self, idle_timeout: Option) -> Self { + self.idle_timeout = idle_timeout; + self } } impl Accept for RelayAcceptor { - type Stream = std::pin::Pin>>; + type Stream = IdleTimeout; type Service = S; type Future = std::future::Ready>; @@ -27,7 +73,7 @@ impl Accept for RelayAcceptor { let mut keepalive = "ok"; let mut nodelay = "ok"; - if let Self(Some(ref tcp_keepalive)) = self { + if let Some(tcp_keepalive) = &self.tcp_keepalive { let sock_ref = socket2::SockRef::from(&stream); if let Err(e) = sock_ref.set_tcp_keepalive(tcp_keepalive) { relay_log::trace!("error trying to set TCP keepalive: {e}"); @@ -46,32 +92,7 @@ impl Accept for RelayAcceptor { nodelay = nodelay ); - let stream = Box::pin(IdleTimeout::new(stream, std::time::Duration::from_secs(5))); + let stream = IdleTimeout::new(stream, self.idle_timeout); std::future::ready(Ok((stream, service))) } } - -fn build_keepalive(config: &Config, keepalive_retries: u32) -> Option { - let ka_timeout = config.keepalive_timeout(); - if ka_timeout.is_zero() { - return None; - } - - let mut ka = TcpKeepalive::new().with_time(ka_timeout); - #[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))] - { - ka = ka.with_interval(ka_timeout); - } - - #[cfg(not(any( - target_os = "openbsd", - target_os = "redox", - target_os = "solaris", - target_os = "windows" - )))] - { - ka = ka.with_retries(keepalive_retries); - } - - Some(ka) -} diff --git a/relay-server/src/services/server/io.rs b/relay-server/src/services/server/io.rs index e689908a8c..941b656c09 100644 --- a/relay-server/src/services/server/io.rs +++ b/relay-server/src/services/server/io.rs @@ -1,55 +1,75 @@ -use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; +use futures::FutureExt; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio::time::Sleep; +use tokio::time::{Instant, Sleep}; use crate::statsd::RelayCounters; -pin_project_lite::pin_project! { - pub struct IdleTimeout { - #[pin] - inner: T, - timeout: Duration, - #[pin] - sleep: Option, - } +/// A wrapper for [`AsyncRead`] and [`AsyncWrite`] streams with a maximum idle time. +/// +/// If there is no activity in the underlying stream for the specified `timeout` +/// the [`IdleTimeout`] will abort the operation and return [`std::io::ErrorKind::TimedOut`]. +pub struct IdleTimeout { + inner: T, + timeout: Option, + // `Box::pin` the sleep timer, the entire connection/stream is required to be `Unpin` anyways. + timer: Option>>, + is_idle: bool, } -impl IdleTimeout { - pub fn new(inner: T, timeout: Duration) -> Self { +impl IdleTimeout { + /// Creates a new [`IdleTimeout`] with the specified timeout. + /// + /// A `None` timeout is equivalent to an infinite timeout. + pub fn new(inner: T, timeout: Option) -> Self { Self { inner, timeout, - sleep: None, + timer: None, + is_idle: false, } } fn wrap_poll( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, poll_fn: F, ) -> Poll> where F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll>, { - let mut this = self.project(); - match poll_fn(this.inner, cx) { + match poll_fn(Pin::new(&mut self.inner), cx) { Poll::Ready(ret) => { // Any activity on the stream resets the timeout. - this.sleep.set(None); + self.is_idle = false; Poll::Ready(ret) } Poll::Pending => { - // No activity on the stream, start the idle timeout. - if this.sleep.is_none() { - this.sleep.set(Some(tokio::time::sleep(*this.timeout))); - } + // No timeout configured -> nothing to do. + let Some(timeout) = self.timeout else { + return Poll::Pending; + }; + + let was_idle = self.is_idle; + self.is_idle = true; + + let timer = match &mut self.timer { + // No timer created and we're idle now, create a timer with the appropriate deadline. + entry @ None => entry.insert(Box::pin(tokio::time::sleep(timeout))), + Some(sleep) => { + // Only if we were not idle, we have to reset the schedule. + if !was_idle { + let deadline = Instant::now() + timeout; + sleep.as_mut().reset(deadline); + } + sleep + } + }; - let sleep = this.sleep.as_pin_mut().expect("sleep timer was just set"); - match sleep.poll(cx) { + match timer.poll_unpin(cx) { Poll::Ready(_) => { relay_log::trace!("closing idle server connection"); relay_statsd::metric!( @@ -64,10 +84,7 @@ impl IdleTimeout { } } -impl AsyncRead for IdleTimeout -where - T: AsyncRead, -{ +impl AsyncRead for IdleTimeout { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -77,10 +94,7 @@ where } } -impl AsyncWrite for IdleTimeout -where - T: AsyncWrite, -{ +impl AsyncWrite for IdleTimeout { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -112,3 +126,140 @@ where self.inner.is_write_vectored() } } + +#[cfg(test)] +mod tests { + use std::io::ErrorKind; + + use tokio::io::{AsyncReadExt, AsyncWriteExt, SimplexStream}; + + use super::*; + + macro_rules! assert_timeout { + ($duration:expr, $future:expr) => { + if let Ok(r) = tokio::time::timeout($duration, $future).await { + assert!( + false, + "expected {} to fail, but it returned {:?} in time", + stringify!($future), + r + ) + } + }; + } + + #[tokio::test(start_paused = true)] + async fn test_read() { + let (receiver, mut sender) = tokio::io::simplex(64); + let mut receiver = IdleTimeout::new(receiver, Some(Duration::from_secs(3))); + + assert_timeout!(Duration::from_millis(2900), receiver.read_u8()); + assert_timeout!(Duration::from_millis(70), receiver.read_u8()); + assert_timeout!(Duration::from_millis(29), receiver.read_u8()); + + sender.write_u8(1).await.unwrap(); + assert_eq!(receiver.read_u8().await.unwrap(), 1); + + // Timeout must be reset after reading. + assert_timeout!(Duration::from_millis(2900), receiver.read_u8()); + assert_timeout!(Duration::from_millis(70), receiver.read_u8()); + assert_timeout!(Duration::from_millis(29), receiver.read_u8()); + + // Only now it should fail. + assert_eq!( + receiver.read_u8().await.unwrap_err().kind(), + ErrorKind::TimedOut + ); + } + + #[tokio::test(start_paused = true)] + async fn test_read_no_idle_time() { + let (receiver, _sender) = tokio::io::simplex(64); + let mut receiver = IdleTimeout::new(receiver, None); + + // A year should be enough... + assert_timeout!(Duration::from_secs(365 * 24 * 3600), receiver.read_u8()); + } + + #[tokio::test(start_paused = true)] + async fn test_write() { + let (mut receiver, sender) = tokio::io::simplex(1); + let mut sender = IdleTimeout::new(sender, Some(Duration::from_secs(3))); + + // First byte can immediately write. + sender.write_u8(1).await.unwrap(); + // Second write, is blocked on the 1 byte sized buffer. + assert_timeout!(Duration::from_millis(2900), sender.write_u8(2)); + assert_timeout!(Duration::from_millis(70), sender.write_u8(2)); + assert_timeout!(Duration::from_millis(29), sender.write_u8(2)); + + // Consume the blocking byte and write successfully. + assert_eq!(receiver.read_u8().await.unwrap(), 1); + sender.write_u8(2).await.unwrap(); + + // Timeout must be reset. + assert_timeout!(Duration::from_millis(2900), sender.write_u8(3)); + assert_timeout!(Duration::from_millis(70), sender.write_u8(3)); + assert_timeout!(Duration::from_millis(29), sender.write_u8(3)); + + // Only now it should fail. + assert_eq!( + sender.write_u8(3).await.unwrap_err().kind(), + ErrorKind::TimedOut + ); + } + + #[tokio::test(start_paused = true)] + async fn test_write_no_timeout() { + let (_receiver, sender) = tokio::io::simplex(1); + let mut sender = IdleTimeout::new(sender, None); + + sender.write_u8(1).await.unwrap(); + // A year should be enough... + assert_timeout!(Duration::from_secs(365 * 24 * 3600), sender.write_u8(2)); + } + + #[tokio::test(start_paused = true)] + async fn test_read_write() { + let stream = SimplexStream::new_unsplit(1); + let mut stream = IdleTimeout::new(stream, Some(Duration::from_secs(3))); + + // First byte can immediately write. + stream.write_u8(1).await.unwrap(); + // And read. + assert_eq!(stream.read_u8().await.unwrap(), 1); + + // The buffer is empty, but we should not time out. + assert_timeout!(Duration::from_millis(2900), stream.read_u8()); + assert_timeout!(Duration::from_millis(70), stream.read_u8()); + assert_timeout!(Duration::from_millis(29), stream.read_u8()); + + // A write resets the read timer. + stream.write_u8(2).await.unwrap(); + tokio::time::advance(Duration::from_millis(2900)).await; + assert_eq!(stream.read_u8().await.unwrap(), 2); + + // Same for writes. + stream.write_u8(3).await.unwrap(); + assert_timeout!(Duration::from_millis(2900), stream.write_u8(3)); + assert_timeout!(Duration::from_millis(70), stream.write_u8(3)); + assert_timeout!(Duration::from_millis(29), stream.write_u8(3)); + + assert_eq!(stream.read_u8().await.unwrap(), 3); + tokio::time::advance(Duration::from_millis(2900)).await; + stream.write_u8(99).await.unwrap(); + + // Buffer is full and no one is clearing it, this should fail. + assert_eq!( + stream.write_u8(0).await.unwrap_err().kind(), + ErrorKind::TimedOut + ); + + // Make sure reads are also timing out. + assert_eq!(stream.read_u8().await.unwrap(), 99); + assert_eq!( + stream.read_u8().await.unwrap_err().kind(), + ErrorKind::TimedOut + ); + } +} diff --git a/relay-server/src/services/server/mod.rs b/relay-server/src/services/server/mod.rs index 264b040047..d2661c1e98 100644 --- a/relay-server/src/services/server/mod.rs +++ b/relay-server/src/services/server/mod.rs @@ -115,11 +115,12 @@ fn listen(config: &Config) -> Result { fn serve(listener: TcpListener, app: App, config: Arc) { let handle = Handle::new(); + let acceptor = self::acceptor::RelayAcceptor::new() + .tcp_keepalive(config.keepalive_timeout(), KEEPALIVE_RETRIES) + .idle_timeout(config.idle_timeout()); + let mut server = axum_server::from_tcp(listener) - .acceptor(self::acceptor::RelayAcceptor::new( - &config, - KEEPALIVE_RETRIES, - )) + .acceptor(acceptor) .handle(handle.clone()); server From 730317cbfc42c0f0f7ac84f7f3d3a78abd8e9073 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Thu, 14 Nov 2024 09:35:46 +0100 Subject: [PATCH 3/3] review changes --- CHANGELOG.md | 2 +- relay-server/src/services/server/acceptor.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc02f4d1c0..847d74f866 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ - Allow `sample_rate` to be float type when deserializing `DynamicSamplingContext`. ([#4181](https://github.com/getsentry/relay/pull/4181)) - Support inbound filters for profiles. ([#4176](https://github.com/getsentry/relay/pull/4176)) - Scrub lower-case redis commands. ([#4235](https://github.com/getsentry/relay/pull/4235)) -- Makes the maximum idle time of a HTTP connection configurable. ([#4248](https://github.com/getsentry/relay/pull/4248)) +- Make the maximum idle time of a HTTP connection configurable. ([#4248](https://github.com/getsentry/relay/pull/4248)) **Internal**: diff --git a/relay-server/src/services/server/acceptor.rs b/relay-server/src/services/server/acceptor.rs index eb684a3c6f..d8607e91be 100644 --- a/relay-server/src/services/server/acceptor.rs +++ b/relay-server/src/services/server/acceptor.rs @@ -32,10 +32,10 @@ impl RelayAcceptor { return self; } - let mut ka = socket2::TcpKeepalive::new().with_time(timeout); + let mut keepalive = socket2::TcpKeepalive::new().with_time(timeout); #[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))] { - ka = ka.with_interval(timeout); + keepalive = keepalive.with_interval(timeout); } #[cfg(not(any( target_os = "openbsd", @@ -44,9 +44,9 @@ impl RelayAcceptor { target_os = "windows" )))] { - ka = ka.with_retries(retries); + keepalive = keepalive.with_retries(retries); } - self.tcp_keepalive = Some(ka); + self.tcp_keepalive = Some(keepalive); self }