feat: enable SO_REUSEPORT on the listener for tests

This could potentially help with port binding issues on the CI server.
Creates a socket2 with SO_REUSEPORT enabled.
praveen/fix-tests
Praveen Kumar 2024-10-09 10:31:21 +01:00
parent 1f1125c767
commit e201622fdb
No known key found for this signature in database
GPG Key ID: CB9E05780A79EA5A
5 changed files with 35 additions and 5 deletions

2
Cargo.lock generated
View File

@ -2623,6 +2623,7 @@ dependencies = [
"secrecy",
"serde_json",
"sha2",
"socket2",
"test-log",
"test_helpers",
"thiserror",
@ -2780,6 +2781,7 @@ dependencies = [
"service_common",
"service_grpc_flight",
"sha2",
"socket2",
"test-log",
"test_helpers",
"thiserror",

View File

@ -99,6 +99,7 @@ serde_urlencoded = "0.7.0"
serde_with = "3.8.1"
sha2 = "0.10.8"
snap = "1.0.0"
socket2 = "0.5.7"
sqlparser = "0.48.0"
sysinfo = "0.30.8"
test-log = { version = "0.2.16", features = ["trace"] }

View File

@ -56,6 +56,7 @@ uuid.workspace = true
# Optional Dependencies
console-subscriber = { version = "0.1.10", optional = true, features = ["parking_lot"] }
socket2 = "0.5.7"
[features]
default = ["jemalloc_replacing_malloc", "azure", "gcp", "aws"]

View File

@ -30,10 +30,11 @@ use object_store::ObjectStore;
use observability_deps::tracing::*;
use panic_logging::SendPanicsToTracing;
use parquet_file::storage::{ParquetStorage, StorageId};
use socket2::{Domain, Type};
use std::{collections::HashMap, path::Path, str::FromStr};
use std::{num::NonZeroUsize, sync::Arc};
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::net::TcpListener as TokioTcpListener;
use tokio_util::sync::CancellationToken;
use trace_exporters::TracingConfig;
use trace_http::ctx::TraceHeaderParser;
@ -460,6 +461,10 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&telemetry_store),
)?;
let sock_addr: std::net::SocketAddr = *config.http_bind_address;
let listener = setup_tokio_tcp_listener(sock_addr)?;
let query_executor = Arc::new(QueryExecutorImpl::new(
write_buffer.catalog(),
Arc::clone(&write_buffer),
@ -471,10 +476,6 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&telemetry_store),
));
let listener = TcpListener::bind(*config.http_bind_address)
.await
.map_err(Error::BindAddress)?;
let builder = ServerBuilder::new(common_state)
.max_request_size(config.max_http_request_size)
.write_buffer(write_buffer)
@ -552,3 +553,27 @@ fn parse_datafusion_config(
Ok(out)
}
#[cfg(windows)]
fn setup_tokio_tcp_listener(sock_addr: std::net::SocketAddr) -> Result<TokioTcpListener> {
let socket = socket2::Socket::new(Domain::IPV4, Type::STREAM, None).expect("create socket");
socket.bind(&sock_addr.into()).expect("bind socket");
socket.listen(1).expect("listening on socket");
let listener: std::net::TcpListener = socket.into();
let listener = TokioTcpListener::from_std(listener);
listener.map_err(Error::BindAddress)
}
#[cfg(not(windows))]
fn setup_tokio_tcp_listener(sock_addr: std::net::SocketAddr) -> Result<TokioTcpListener> {
let socket = socket2::Socket::new(Domain::IPV4, Type::STREAM, None).expect("create socket");
socket.bind(&sock_addr.into()).expect("bind socket");
socket.set_reuse_address(true).expect("setup reuse addr");
socket.set_reuse_port(true).expect("setup reuse port");
socket.listen(1).expect("listening on socket");
let listener: std::net::TcpListener = socket.into();
let listener = TokioTcpListener::from_std(listener);
listener.map_err(Error::BindAddress)
}

View File

@ -65,6 +65,7 @@ serde.workspace = true
serde_json.workspace = true
serde_urlencoded.workspace = true
sha2.workspace = true
socket2.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true