diff --git a/src/commands/server.rs b/src/commands/server.rs index 74fc487051..99b0e13f30 100644 --- a/src/commands/server.rs +++ b/src/commands/server.rs @@ -103,7 +103,11 @@ pub async fn main() -> Result<()> { } }; - let grpc_server = storage::make_server(grpc_bind_addr, storage.clone(), executor); + let socket = tokio::net::TcpListener::bind(grpc_bind_addr) + .await + .expect("failed to bind server"); + + let grpc_server = storage::make_server(socket, storage.clone(), executor); info!("gRPC server listening on http://{}", grpc_bind_addr); diff --git a/src/server/rpc/storage.rs b/src/server/rpc/storage.rs index a56d81f94c..855693548a 100644 --- a/src/server/rpc/storage.rs +++ b/src/server/rpc/storage.rs @@ -2,7 +2,7 @@ //! implemented in terms of the `query::Database` and //! `query::DatabaseStore` -use std::{collections::HashMap, net::SocketAddr, sync::Arc}; +use std::{collections::HashMap, sync::Arc}; use generated_types::{ i_ox_server::{IOx, IOxServer}, @@ -39,7 +39,7 @@ use query::{ use snafu::{OptionExt, ResultExt, Snafu}; -use tokio::sync::mpsc; +use tokio::{net::TcpListener, sync::mpsc}; use tonic::Status; use tracing::{info, warn}; @@ -1128,7 +1128,7 @@ where /// underlying hyper server instance. Resolves when the server has /// shutdown. pub async fn make_server( - bind_addr: SocketAddr, + socket: TcpListener, storage: Arc, executor: Arc, ) -> Result<()> @@ -1144,7 +1144,7 @@ where storage.clone(), executor.clone(), ))) - .serve(bind_addr) + .serve_with_incoming(socket) .await .context(ServerError {}) .log_if_error("Running Tonic Server") @@ -1190,9 +1190,7 @@ mod tests { #[tokio::test] async fn test_influxdb_iox_rpc() -> Result<()> { - let mut fixture = Fixture::new(11807) - .await - .expect("Connecting to test server"); + let mut fixture = Fixture::new().await.expect("Connecting to test server"); let org = Organization { id: 1337, @@ -1222,10 +1220,8 @@ mod tests { #[tokio::test] async fn test_storage_rpc_capabilities() -> Result<(), tonic::Status> { - // Note we use a unique port. TODO: let the OS pick the port - let mut fixture = Fixture::new(11808) - .await - .expect("Connecting to test server"); + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); // Test response from storage server let mut expected_capabilities: HashMap> = HashMap::new(); @@ -1244,10 +1240,8 @@ mod tests { #[tokio::test] async fn test_storage_rpc_measurement_names() -> Result<(), tonic::Status> { - // Note we use a unique port. TODO: let the OS pick the port - let mut fixture = Fixture::new(11809) - .await - .expect("Connecting to test server"); + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; @@ -1299,10 +1293,8 @@ mod tests { /// and that the returned values are sent back via gRPC. #[tokio::test] async fn test_storage_rpc_tag_keys() -> Result<(), tonic::Status> { - // Note we use a unique port. TODO: let the OS pick the port - let mut fixture = Fixture::new(11810) - .await - .expect("Connecting to test server"); + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; @@ -1380,10 +1372,8 @@ mod tests { /// and that the returned values are sent back via gRPC. #[tokio::test] async fn test_storage_rpc_measurement_tag_keys() -> Result<(), tonic::Status> { - // Note we use a unique port. TODO: let the OS pick the port - let mut fixture = Fixture::new(11811) - .await - .expect("Connecting to test server"); + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; @@ -1476,10 +1466,8 @@ mod tests { /// and that the returned values are sent back via gRPC. #[tokio::test] async fn test_storage_rpc_tag_values() -> Result<(), tonic::Status> { - // Note we use a unique port. TODO: let the OS pick the port - let mut fixture = Fixture::new(11812) - .await - .expect("Connecting to test server"); + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; @@ -1582,10 +1570,8 @@ mod tests { /// and that the returned values are sent back via gRPC. #[tokio::test] async fn test_storage_rpc_measurement_tag_values() -> Result<(), tonic::Status> { - // Note we use a unique port. TODO: let the OS pick the port - let mut fixture = Fixture::new(11813) - .await - .expect("Connecting to test server"); + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; @@ -1677,10 +1663,8 @@ mod tests { // capture all tracing messages let tracing_capture = TracingCapture::new(); - // Note we use a unique port. TODO: let the OS pick the port - let mut fixture = Fixture::new(11900) - .await - .expect("Connecting to test server"); + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); let request = TestErrorRequest {}; @@ -1738,10 +1722,8 @@ mod tests { #[tokio::test] async fn test_read_filter() -> Result<(), tonic::Status> { - // Note we use a unique port. TODO: let the OS pick the port - let mut fixture = Fixture::new(11901) - .await - .expect("Connecting to test server"); + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; @@ -1817,10 +1799,8 @@ mod tests { #[tokio::test] async fn test_read_group() -> Result<(), tonic::Status> { - // Note we use a unique port. TODO: let the OS pick the port - let mut fixture = Fixture::new(11902) - .await - .expect("Connecting to test server"); + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; @@ -1917,10 +1897,8 @@ mod tests { #[tokio::test] async fn test_read_window_aggegate() -> Result<(), tonic::Status> { - // Note we use a unique port. TODO: let the OS pick the port - let mut fixture = Fixture::new(11903) - .await - .expect("Connecting to test server"); + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; @@ -2078,10 +2056,8 @@ mod tests { #[tokio::test] async fn test_measurement_fields() -> Result<(), tonic::Status> { - // Note we use a unique port. TODO: let the OS pick the port - let mut fixture = Fixture::new(11904) - .await - .expect("Connecting to test server"); + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; @@ -2503,6 +2479,15 @@ mod tests { } } + #[derive(Debug, Snafu)] + pub enum FixtureError { + #[snafu(display("Error binding fixture server: {}", source))] + Bind { source: std::io::Error }, + + #[snafu(display("Error creating fixture: {}", source))] + Tonic { source: tonic::transport::Error }, + } + // Wrapper around raw clients and test database struct Fixture { iox_client: IOxClient, @@ -2514,22 +2499,32 @@ mod tests { impl Fixture { /// Start up a test rpc server listening on `port`, returning /// a fixture with the test server and clients - async fn new(port: u16) -> Result { + async fn new() -> Result { let test_storage = Arc::new(TestDatabaseStore::new()); let test_executor = Arc::new(QueryExecutor::default()); - // TODO: specify port 0 to let the OS pick the port (need to - // figure out how to get access to the actual addr from tonic) - let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); + // Get a random port from the kernel by asking for port 0. + let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let socket = tokio::net::TcpListener::bind(bind_addr) + .await + .context(Bind)?; + + // Pull the assigned port out of the socket + let bind_addr = socket.local_addr().unwrap(); println!("Starting InfluxDB IOx rpc test server on {:?}", bind_addr); - let server = make_server(bind_addr, test_storage.clone(), test_executor.clone()); + let server = make_server(socket, test_storage.clone(), test_executor.clone()); tokio::task::spawn(server); - let iox_client = connect_to_server::(bind_addr).await?; - let storage_client = - StorageClientWrapper::new(connect_to_server::(bind_addr).await?); + let iox_client = connect_to_server::(bind_addr) + .await + .context(Tonic)?; + let storage_client = StorageClientWrapper::new( + connect_to_server::(bind_addr) + .await + .context(Tonic)?, + ); Ok(Self { iox_client,