diff --git a/tests/common/server_fixture.rs b/tests/common/server_fixture.rs index c0af48bf76..18761c9784 100644 --- a/tests/common/server_fixture.rs +++ b/tests/common/server_fixture.rs @@ -1,6 +1,7 @@ use assert_cmd::prelude::*; use std::{ fs::OpenOptions, + path::Path, process::{Child, Command}, str, sync::{ @@ -21,7 +22,7 @@ use tempfile::TempDir; static NEXT_PORT: AtomicUsize = AtomicUsize::new(8090); /// This structure contains all the addresses a test server should use -struct BindAddresses { +pub struct BindAddresses { http_port: usize, grpc_port: usize, @@ -34,8 +35,18 @@ struct BindAddresses { } impl BindAddresses { + pub fn http_bind_addr(&self) -> &str { + &self.http_bind_addr + } + + pub fn grpc_bind_addr(&self) -> &str { + &self.grpc_bind_addr + } +} + +impl Default for BindAddresses { /// return a new port assignment suitable for this test's use - fn new() -> Self { + fn default() -> Self { let http_port = NEXT_PORT.fetch_add(1, SeqCst); let grpc_port = NEXT_PORT.fetch_add(1, SeqCst); @@ -228,6 +239,11 @@ impl ServerFixture { tokio::time::sleep(Duration::from_millis(100)).await; } } + + /// Directory used for data storage. + pub fn dir(&self) -> &Path { + &self.server.dir.path() + } } #[derive(Debug)] @@ -255,7 +271,7 @@ struct TestServer { impl TestServer { fn new() -> Self { - let addrs = BindAddresses::new(); + let addrs = BindAddresses::default(); let ready = Mutex::new(ServerState::Started); let dir = test_helpers::tmp_dir().unwrap(); @@ -309,8 +325,8 @@ impl TestServer { .arg("-vv") .env("INFLUXDB_IOX_OBJECT_STORE", "file") .env("INFLUXDB_IOX_DB_DIR", dir.path()) - .env("INFLUXDB_IOX_BIND_ADDR", &addrs.http_bind_addr) - .env("INFLUXDB_IOX_GRPC_BIND_ADDR", &addrs.grpc_bind_addr) + .env("INFLUXDB_IOX_BIND_ADDR", addrs.http_bind_addr()) + .env("INFLUXDB_IOX_GRPC_BIND_ADDR", addrs.grpc_bind_addr()) // redirect output to log file .stdout(stdout_log_file) .stderr(stderr_log_file) @@ -332,33 +348,7 @@ impl TestServer { // Poll the RPC and HTTP servers separately as they listen on // different ports but both need to be up for the test to run - let try_grpc_connect = async { - let mut interval = tokio::time::interval(Duration::from_millis(500)); - - loop { - match self.grpc_channel().await { - Ok(channel) => { - println!("Successfully connected to server"); - - let mut health = influxdb_iox_client::health::Client::new(channel); - - match health.check_storage().await { - Ok(_) => { - println!("Storage service is running"); - return; - } - Err(e) => { - println!("Error checking storage service status: {}", e); - } - } - } - Err(e) => { - println!("Waiting for gRPC API to be up: {}", e); - } - } - interval.tick().await; - } - }; + let try_grpc_connect = wait_for_grpc(self.addrs()); let try_http_connect = async { let client = reqwest::Client::new(); @@ -453,9 +443,7 @@ impl TestServer { async fn grpc_channel( &self, ) -> influxdb_iox_client::connection::Result { - influxdb_iox_client::connection::Builder::default() - .build(&self.addrs().grpc_base) - .await + grpc_channel(&self.addrs).await } fn addrs(&self) -> &BindAddresses { @@ -463,6 +451,43 @@ impl TestServer { } } +/// Create a connection channel for the gRPR endpoint +pub async fn grpc_channel( + addrs: &BindAddresses, +) -> influxdb_iox_client::connection::Result { + influxdb_iox_client::connection::Builder::default() + .build(&addrs.grpc_base) + .await +} + +pub async fn wait_for_grpc(addrs: &BindAddresses) { + let mut interval = tokio::time::interval(Duration::from_millis(500)); + + loop { + match grpc_channel(addrs).await { + Ok(channel) => { + println!("Successfully connected to server"); + + let mut health = influxdb_iox_client::health::Client::new(channel); + + match health.check_storage().await { + Ok(_) => { + println!("Storage service is running"); + return; + } + Err(e) => { + println!("Error checking storage service status: {}", e); + } + } + } + Err(e) => { + println!("Waiting for gRPC API to be up: {}", e); + } + } + interval.tick().await; + } +} + impl std::fmt::Display for TestServer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { write!(