test: make some `server_fixture` functionality public

This is useful when you want to test a server boot-up with custom
configs.
pull/24376/head
Marco Neumann 2021-06-10 17:51:47 +02:00
parent 2ea24b6467
commit a449d5ef74
1 changed files with 60 additions and 35 deletions

View File

@ -1,6 +1,7 @@
use assert_cmd::prelude::*; use assert_cmd::prelude::*;
use std::{ use std::{
fs::OpenOptions, fs::OpenOptions,
path::Path,
process::{Child, Command}, process::{Child, Command},
str, str,
sync::{ sync::{
@ -21,7 +22,7 @@ use tempfile::TempDir;
static NEXT_PORT: AtomicUsize = AtomicUsize::new(8090); static NEXT_PORT: AtomicUsize = AtomicUsize::new(8090);
/// This structure contains all the addresses a test server should use /// This structure contains all the addresses a test server should use
struct BindAddresses { pub struct BindAddresses {
http_port: usize, http_port: usize,
grpc_port: usize, grpc_port: usize,
@ -34,8 +35,18 @@ struct BindAddresses {
} }
impl BindAddresses { impl BindAddresses {
pub fn http_bind_addr(&self) -> &str {
&self.http_bind_addr
}
pub fn grpc_bind_addr(&self) -> &str {
&self.grpc_bind_addr
}
}
impl Default for BindAddresses {
/// return a new port assignment suitable for this test's use /// return a new port assignment suitable for this test's use
fn new() -> Self { fn default() -> Self {
let http_port = NEXT_PORT.fetch_add(1, SeqCst); let http_port = NEXT_PORT.fetch_add(1, SeqCst);
let grpc_port = NEXT_PORT.fetch_add(1, SeqCst); let grpc_port = NEXT_PORT.fetch_add(1, SeqCst);
@ -228,6 +239,11 @@ impl ServerFixture {
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
} }
} }
/// Directory used for data storage.
pub fn dir(&self) -> &Path {
&self.server.dir.path()
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -255,7 +271,7 @@ struct TestServer {
impl TestServer { impl TestServer {
fn new() -> Self { fn new() -> Self {
let addrs = BindAddresses::new(); let addrs = BindAddresses::default();
let ready = Mutex::new(ServerState::Started); let ready = Mutex::new(ServerState::Started);
let dir = test_helpers::tmp_dir().unwrap(); let dir = test_helpers::tmp_dir().unwrap();
@ -309,8 +325,8 @@ impl TestServer {
.arg("-vv") .arg("-vv")
.env("INFLUXDB_IOX_OBJECT_STORE", "file") .env("INFLUXDB_IOX_OBJECT_STORE", "file")
.env("INFLUXDB_IOX_DB_DIR", dir.path()) .env("INFLUXDB_IOX_DB_DIR", dir.path())
.env("INFLUXDB_IOX_BIND_ADDR", &addrs.http_bind_addr) .env("INFLUXDB_IOX_BIND_ADDR", addrs.http_bind_addr())
.env("INFLUXDB_IOX_GRPC_BIND_ADDR", &addrs.grpc_bind_addr) .env("INFLUXDB_IOX_GRPC_BIND_ADDR", addrs.grpc_bind_addr())
// redirect output to log file // redirect output to log file
.stdout(stdout_log_file) .stdout(stdout_log_file)
.stderr(stderr_log_file) .stderr(stderr_log_file)
@ -332,33 +348,7 @@ impl TestServer {
// Poll the RPC and HTTP servers separately as they listen on // Poll the RPC and HTTP servers separately as they listen on
// different ports but both need to be up for the test to run // different ports but both need to be up for the test to run
let try_grpc_connect = async { let try_grpc_connect = wait_for_grpc(self.addrs());
let mut interval = tokio::time::interval(Duration::from_millis(500));
loop {
match self.grpc_channel().await {
Ok(channel) => {
println!("Successfully connected to server");
let mut health = influxdb_iox_client::health::Client::new(channel);
match health.check_storage().await {
Ok(_) => {
println!("Storage service is running");
return;
}
Err(e) => {
println!("Error checking storage service status: {}", e);
}
}
}
Err(e) => {
println!("Waiting for gRPC API to be up: {}", e);
}
}
interval.tick().await;
}
};
let try_http_connect = async { let try_http_connect = async {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
@ -453,9 +443,7 @@ impl TestServer {
async fn grpc_channel( async fn grpc_channel(
&self, &self,
) -> influxdb_iox_client::connection::Result<tonic::transport::Channel> { ) -> influxdb_iox_client::connection::Result<tonic::transport::Channel> {
influxdb_iox_client::connection::Builder::default() grpc_channel(&self.addrs).await
.build(&self.addrs().grpc_base)
.await
} }
fn addrs(&self) -> &BindAddresses { fn addrs(&self) -> &BindAddresses {
@ -463,6 +451,43 @@ impl TestServer {
} }
} }
/// Create a connection channel for the gRPR endpoint
pub async fn grpc_channel(
addrs: &BindAddresses,
) -> influxdb_iox_client::connection::Result<tonic::transport::Channel> {
influxdb_iox_client::connection::Builder::default()
.build(&addrs.grpc_base)
.await
}
pub async fn wait_for_grpc(addrs: &BindAddresses) {
let mut interval = tokio::time::interval(Duration::from_millis(500));
loop {
match grpc_channel(addrs).await {
Ok(channel) => {
println!("Successfully connected to server");
let mut health = influxdb_iox_client::health::Client::new(channel);
match health.check_storage().await {
Ok(_) => {
println!("Storage service is running");
return;
}
Err(e) => {
println!("Error checking storage service status: {}", e);
}
}
}
Err(e) => {
println!("Waiting for gRPC API to be up: {}", e);
}
}
interval.tick().await;
}
}
impl std::fmt::Display for TestServer { impl std::fmt::Display for TestServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
write!( write!(