From e201622fdbafd98104e3fb795807e3795ce59fa3 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Wed, 9 Oct 2024 10:31:21 +0100 Subject: [PATCH] feat: enable SO_REUSEPORT on the listener for tests This could potentially help with port binding issues on the CI server. Creates a socket2 with SO_REUSEPORT enabled. --- Cargo.lock | 2 ++ Cargo.toml | 1 + influxdb3/Cargo.toml | 1 + influxdb3/src/commands/serve.rs | 35 ++++++++++++++++++++++++++++----- influxdb3_server/Cargo.toml | 1 + 5 files changed, 35 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5bdee84c16c..6b9feae5047 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2623,6 +2623,7 @@ dependencies = [ "secrecy", "serde_json", "sha2", + "socket2", "test-log", "test_helpers", "thiserror", @@ -2780,6 +2781,7 @@ dependencies = [ "service_common", "service_grpc_flight", "sha2", + "socket2", "test-log", "test_helpers", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index a3b20c951ac..4bcbcaefe20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,6 +99,7 @@ serde_urlencoded = "0.7.0" serde_with = "3.8.1" sha2 = "0.10.8" snap = "1.0.0" +socket2 = "0.5.7" sqlparser = "0.48.0" sysinfo = "0.30.8" test-log = { version = "0.2.16", features = ["trace"] } diff --git a/influxdb3/Cargo.toml b/influxdb3/Cargo.toml index dee47b0ef7a..236adf03198 100644 --- a/influxdb3/Cargo.toml +++ b/influxdb3/Cargo.toml @@ -56,6 +56,7 @@ uuid.workspace = true # Optional Dependencies console-subscriber = { version = "0.1.10", optional = true, features = ["parking_lot"] } +socket2 = "0.5.7" [features] default = ["jemalloc_replacing_malloc", "azure", "gcp", "aws"] diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index a4d85930dbd..7052009b56c 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -30,10 +30,11 @@ use object_store::ObjectStore; use observability_deps::tracing::*; use panic_logging::SendPanicsToTracing; use parquet_file::storage::{ParquetStorage, StorageId}; +use socket2::{Domain, Type}; use std::{collections::HashMap, path::Path, str::FromStr}; use std::{num::NonZeroUsize, sync::Arc}; use thiserror::Error; -use tokio::net::TcpListener; +use tokio::net::TcpListener as TokioTcpListener; use tokio_util::sync::CancellationToken; use trace_exporters::TracingConfig; use trace_http::ctx::TraceHeaderParser; @@ -460,6 +461,10 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&telemetry_store), )?; + let sock_addr: std::net::SocketAddr = *config.http_bind_address; + + let listener = setup_tokio_tcp_listener(sock_addr)?; + let query_executor = Arc::new(QueryExecutorImpl::new( write_buffer.catalog(), Arc::clone(&write_buffer), @@ -471,10 +476,6 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&telemetry_store), )); - let listener = TcpListener::bind(*config.http_bind_address) - .await - .map_err(Error::BindAddress)?; - let builder = ServerBuilder::new(common_state) .max_request_size(config.max_http_request_size) .write_buffer(write_buffer) @@ -552,3 +553,27 @@ fn parse_datafusion_config( Ok(out) } + +#[cfg(windows)] +fn setup_tokio_tcp_listener(sock_addr: std::net::SocketAddr) -> Result { + let socket = socket2::Socket::new(Domain::IPV4, Type::STREAM, None).expect("create socket"); + socket.bind(&sock_addr.into()).expect("bind socket"); + socket.listen(1).expect("listening on socket"); + + let listener: std::net::TcpListener = socket.into(); + let listener = TokioTcpListener::from_std(listener); + listener.map_err(Error::BindAddress) +} + +#[cfg(not(windows))] +fn setup_tokio_tcp_listener(sock_addr: std::net::SocketAddr) -> Result { + let socket = socket2::Socket::new(Domain::IPV4, Type::STREAM, None).expect("create socket"); + socket.bind(&sock_addr.into()).expect("bind socket"); + socket.set_reuse_address(true).expect("setup reuse addr"); + socket.set_reuse_port(true).expect("setup reuse port"); + socket.listen(1).expect("listening on socket"); + + let listener: std::net::TcpListener = socket.into(); + let listener = TokioTcpListener::from_std(listener); + listener.map_err(Error::BindAddress) +} diff --git a/influxdb3_server/Cargo.toml b/influxdb3_server/Cargo.toml index a02c70176ef..fa042093cb5 100644 --- a/influxdb3_server/Cargo.toml +++ b/influxdb3_server/Cargo.toml @@ -65,6 +65,7 @@ serde.workspace = true serde_json.workspace = true serde_urlencoded.workspace = true sha2.workspace = true +socket2.workspace = true thiserror.workspace = true tokio.workspace = true tokio-util.workspace = true