refactor: use typed `TestConfig` rather than environment variable names for NG end to end tests (#4126)
* refactor: move environment variable mapping in end to end tests into TestConfig * fix: clippy Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
39d9f30f12
commit
e222acbb48
|
@ -1,18 +1,14 @@
|
|||
use arrow_util::assert_batches_sorted_eq;
|
||||
use http::StatusCode;
|
||||
use tempfile::TempDir;
|
||||
use test_helpers_end_to_end_ng::{
|
||||
maybe_skip_integration, query_until_results, rand_name, write_to_router, ServerFixture,
|
||||
ServerType, TestConfig,
|
||||
TestConfig,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn smoke() {
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let write_buffer_dir = TempDir::new().unwrap();
|
||||
let write_buffer_string = write_buffer_dir.path().display().to_string();
|
||||
let n_sequencers = "1";
|
||||
let org = rand_name();
|
||||
let bucket = rand_name();
|
||||
let namespace = format!("{}_{}", org, bucket);
|
||||
|
@ -20,18 +16,9 @@ async fn smoke() {
|
|||
|
||||
// Set up all_in_one ====================================
|
||||
|
||||
let test_config = TestConfig::new(ServerType::AllInOne)
|
||||
.with_postgres_catalog(&database_url)
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_TYPE", "file")
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS", n_sequencers)
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_ADDR", &write_buffer_string)
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_START", "0")
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_END", "0")
|
||||
// Aggressive expulsion of parquet files
|
||||
.with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "2")
|
||||
.with_env("INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES", "1");
|
||||
let test_config = TestConfig::new_all_in_one(database_url);
|
||||
|
||||
let all_in_one = ServerFixture::create_single_use_with_config(test_config).await;
|
||||
let all_in_one = ServerFixture::create(test_config).await;
|
||||
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name);
|
||||
|
|
|
@ -1,19 +1,15 @@
|
|||
use http::StatusCode;
|
||||
use test_helpers_end_to_end_ng::{
|
||||
maybe_skip_integration, rand_name, write_to_router, ServerFixture, ServerType, TestConfig,
|
||||
maybe_skip_integration, rand_name, write_to_router, ServerFixture, TestConfig,
|
||||
};
|
||||
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use data_types2::{IngesterQueryRequest, SequencerId};
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[tokio::test]
|
||||
async fn router2_through_ingester() {
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let write_buffer_dir = TempDir::new().unwrap();
|
||||
let write_buffer_string = write_buffer_dir.path().display().to_string();
|
||||
let n_sequencers = 1;
|
||||
let sequencer_id = SequencerId::new(1);
|
||||
let org = rand_name();
|
||||
let bucket = rand_name();
|
||||
|
@ -22,15 +18,10 @@ async fn router2_through_ingester() {
|
|||
|
||||
// Set up router2 ====================================
|
||||
|
||||
let test_config = TestConfig::new(ServerType::Router2)
|
||||
.with_postgres_catalog(&database_url)
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_TYPE", "file")
|
||||
.with_env(
|
||||
"INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS",
|
||||
n_sequencers.to_string(),
|
||||
)
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_ADDR", &write_buffer_string);
|
||||
let router2 = ServerFixture::create_single_use_with_config(test_config).await;
|
||||
let router2_config = TestConfig::new_router2(&database_url);
|
||||
let ingester_config = TestConfig::new_ingester(&router2_config);
|
||||
|
||||
let router2 = ServerFixture::create(router2_config).await;
|
||||
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name);
|
||||
|
@ -40,20 +31,7 @@ async fn router2_through_ingester() {
|
|||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Set up ingester ===================================
|
||||
|
||||
let test_config = TestConfig::new(ServerType::Ingester)
|
||||
.with_postgres_catalog(&database_url)
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_TYPE", "file")
|
||||
.with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "20")
|
||||
.with_env("INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES", "10")
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_ADDR", &write_buffer_string)
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_START", "0")
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_END", "0")
|
||||
.with_env(
|
||||
"INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS",
|
||||
n_sequencers.to_string(),
|
||||
);
|
||||
let ingester = ServerFixture::create_single_use_with_config(test_config).await;
|
||||
let ingester = ServerFixture::create(ingester_config).await;
|
||||
|
||||
let mut querier_flight =
|
||||
querier::flight::Client::new(ingester.server().ingester_grpc_connection());
|
||||
|
|
|
@ -1,12 +1,15 @@
|
|||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use http::{header::HeaderName, HeaderValue};
|
||||
use tempfile::TempDir;
|
||||
|
||||
use super::ServerType;
|
||||
|
||||
/// Options for creating test servers (`influxdb_iox` processes)
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TestConfig {
|
||||
/// Additional environment variables to pass
|
||||
env: Vec<(String, String)>,
|
||||
/// environment variables to pass to server process. HashMap to avoid duplication
|
||||
env: HashMap<String, String>,
|
||||
|
||||
/// Headers to add to all client requests
|
||||
client_headers: Vec<(HeaderName, HeaderValue)>,
|
||||
|
@ -15,45 +18,113 @@ pub struct TestConfig {
|
|||
server_type: ServerType,
|
||||
|
||||
/// Catalog DSN value
|
||||
dsn: Option<String>,
|
||||
dsn: String,
|
||||
|
||||
/// Write buffer directory, if needed
|
||||
write_buffer_dir: Option<Arc<TempDir>>,
|
||||
}
|
||||
|
||||
impl TestConfig {
|
||||
pub fn new(server_type: ServerType) -> Self {
|
||||
/// Create a new TestConfig (tests should use one of the specific
|
||||
/// configuration setup below, such as [new_router2]
|
||||
fn new(server_type: ServerType, dsn: impl Into<String>) -> Self {
|
||||
Self {
|
||||
env: vec![],
|
||||
env: HashMap::new(),
|
||||
client_headers: vec![],
|
||||
server_type,
|
||||
dsn: None,
|
||||
dsn: dsn.into(),
|
||||
write_buffer_dir: None,
|
||||
}
|
||||
}
|
||||
|
||||
// change server type
|
||||
pub fn with_server_type(mut self, server_type: ServerType) -> Self {
|
||||
self.server_type = server_type;
|
||||
self
|
||||
/// Create a minimal router2 configuration
|
||||
pub fn new_router2(dsn: impl Into<String>) -> Self {
|
||||
Self::new(ServerType::Router2, dsn).with_new_write_buffer()
|
||||
}
|
||||
|
||||
/// Set Postgres catalog DSN URL
|
||||
pub fn with_postgres_catalog(mut self, dsn: &str) -> Self {
|
||||
self.dsn = Some(dsn.into());
|
||||
/// Create a minimal ingester configuration, using the dsn and
|
||||
/// write buffer configuration from other
|
||||
pub fn new_ingester(other: &TestConfig) -> Self {
|
||||
Self::new(ServerType::Ingester, other.dsn())
|
||||
.with_existing_write_buffer(other)
|
||||
.with_default_ingester_options()
|
||||
}
|
||||
|
||||
self
|
||||
/// Create a minimal all in one configuration
|
||||
pub fn new_all_in_one(dsn: impl Into<String>) -> Self {
|
||||
Self::new(ServerType::AllInOne, dsn)
|
||||
.with_new_write_buffer()
|
||||
.with_default_ingester_options()
|
||||
// Aggressive expulsion of parquet files
|
||||
.with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "2")
|
||||
.with_env("INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES", "1")
|
||||
}
|
||||
|
||||
// Get the catalog DSN URL and panic if it's not set
|
||||
pub fn dsn(&self) -> &str {
|
||||
self.dsn
|
||||
.as_ref()
|
||||
.expect("Test Config must have a catalog configured")
|
||||
&self.dsn
|
||||
}
|
||||
|
||||
// add a name=value environment variable when starting the server
|
||||
pub fn with_env(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
|
||||
self.env.push((name.into(), value.into()));
|
||||
/// Adds default ingester options
|
||||
fn with_default_ingester_options(self) -> Self {
|
||||
self.with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "20")
|
||||
.with_env("INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES", "10")
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_START", "0")
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_END", "0")
|
||||
}
|
||||
|
||||
/// add a name=value environment variable when starting the server
|
||||
///
|
||||
/// Should not be called directly, but instead all mapping to
|
||||
/// environment variables should be done via this structure
|
||||
fn with_env(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
|
||||
self.env.insert(name.into(), value.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// copy the specified environment variables from other; Panic's if they do not exist.
|
||||
///
|
||||
/// Should not be called directly, but instead all mapping to
|
||||
/// environment variables should be done via this structure
|
||||
fn copy_env(self, name: impl Into<String>, other: &TestConfig) -> Self {
|
||||
let name = name.into();
|
||||
let value = match other.env.get(&name) {
|
||||
Some(v) => v.clone(),
|
||||
None => panic!(
|
||||
"Can not copy {} from existing config. Available values are: {:#?}",
|
||||
name, other.env
|
||||
),
|
||||
};
|
||||
|
||||
self.with_env(name, value)
|
||||
}
|
||||
|
||||
/// Configures a new write buffer
|
||||
pub fn with_new_write_buffer(mut self) -> Self {
|
||||
let n_sequencers = 1;
|
||||
let tmpdir = TempDir::new().expect("can not create tmp dir");
|
||||
let write_buffer_string = tmpdir.path().display().to_string();
|
||||
self.write_buffer_dir = Some(Arc::new(tmpdir));
|
||||
|
||||
self.with_env("INFLUXDB_IOX_WRITE_BUFFER_TYPE", "file")
|
||||
.with_env(
|
||||
"INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS",
|
||||
n_sequencers.to_string(),
|
||||
)
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_ADDR", &write_buffer_string)
|
||||
}
|
||||
|
||||
/// Configures this TestConfig to use the same write buffer as other
|
||||
pub fn with_existing_write_buffer(mut self, other: &TestConfig) -> Self {
|
||||
// get the directory, if any
|
||||
self.write_buffer_dir = other.write_buffer_dir.clone();
|
||||
|
||||
// copy the environment variables
|
||||
self.copy_env("INFLUXDB_IOX_WRITE_BUFFER_TYPE", other)
|
||||
.copy_env("INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS", other)
|
||||
.copy_env("INFLUXDB_IOX_WRITE_BUFFER_ADDR", other)
|
||||
}
|
||||
|
||||
// add a name=value http header to all client requests made to the server
|
||||
pub fn with_client_header(mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> Self {
|
||||
self.client_headers.push((
|
||||
|
@ -70,9 +141,8 @@ impl TestConfig {
|
|||
}
|
||||
|
||||
/// Get a reference to the test config's env.
|
||||
#[must_use]
|
||||
pub fn env(&self) -> &[(String, String)] {
|
||||
self.env.as_ref()
|
||||
pub fn env(&self) -> impl Iterator<Item = (&str, &str)> {
|
||||
self.env.iter().map(|(k, v)| (k.as_str(), v.as_str()))
|
||||
}
|
||||
|
||||
/// Get a reference to the test config's client headers.
|
||||
|
|
|
@ -25,14 +25,7 @@ impl ServerFixture {
|
|||
/// Create a new server fixture and wait for it to be ready. This
|
||||
/// is called "create" rather than new because it is async and
|
||||
/// waits. The server is not shared with any other tests.
|
||||
pub async fn create_single_use(server_type: ServerType) -> Self {
|
||||
let test_config = TestConfig::new(server_type);
|
||||
Self::create_single_use_with_config(test_config).await
|
||||
}
|
||||
|
||||
/// Create a new server fixture with the provided additional environment variables
|
||||
/// and wait for it to be ready. The server is not shared with any other tests.
|
||||
pub async fn create_single_use_with_config(test_config: TestConfig) -> Self {
|
||||
pub async fn create(test_config: TestConfig) -> Self {
|
||||
let mut server = TestServer::new(test_config).await;
|
||||
|
||||
// ensure the server is ready
|
||||
|
@ -123,14 +116,7 @@ impl TestServer {
|
|||
let dir = test_helpers::tmp_dir().unwrap();
|
||||
|
||||
let server_process = Arc::new(Mutex::new(
|
||||
Self::create_server_process(
|
||||
&addrs,
|
||||
&dir,
|
||||
test_config.dsn(),
|
||||
test_config.env(),
|
||||
test_config.server_type(),
|
||||
)
|
||||
.await,
|
||||
Self::create_server_process(&addrs, &dir, &test_config).await,
|
||||
));
|
||||
|
||||
Self {
|
||||
|
@ -243,23 +229,15 @@ impl TestServer {
|
|||
let mut server_process = self.server_process.lock().await;
|
||||
server_process.child.kill().unwrap();
|
||||
server_process.child.wait().unwrap();
|
||||
*server_process = Self::create_server_process(
|
||||
&self.addrs,
|
||||
&self.dir,
|
||||
self.test_config.dsn(),
|
||||
self.test_config.env(),
|
||||
self.test_config.server_type(),
|
||||
)
|
||||
.await;
|
||||
*server_process =
|
||||
Self::create_server_process(&self.addrs, &self.dir, &self.test_config).await;
|
||||
*ready_guard = ServerState::Started;
|
||||
}
|
||||
|
||||
async fn create_server_process(
|
||||
addrs: &BindAddresses,
|
||||
dir: &TempDir,
|
||||
dsn: &str,
|
||||
env: &[(String, String)],
|
||||
server_type: ServerType,
|
||||
object_store_dir: &TempDir,
|
||||
test_config: &TestConfig,
|
||||
) -> Process {
|
||||
// Create a new file each time and keep it around to aid debugging
|
||||
let (log_file, log_path) = NamedTempFile::new()
|
||||
|
@ -272,6 +250,8 @@ impl TestServer {
|
|||
.expect("cloning file handle for stdout");
|
||||
let stderr_log_file = log_file;
|
||||
|
||||
let server_type = test_config.server_type();
|
||||
|
||||
println!("****************");
|
||||
println!("Server {:?} Logging to {:?}", server_type, log_path);
|
||||
println!("****************");
|
||||
|
@ -282,6 +262,7 @@ impl TestServer {
|
|||
|
||||
let run_command = server_type.run_command();
|
||||
|
||||
let dsn = test_config.dsn();
|
||||
initialize_db(dsn).await;
|
||||
|
||||
// This will inherit environment from the test runner
|
||||
|
@ -293,10 +274,10 @@ impl TestServer {
|
|||
.env("LOG_FILTER", log_filter)
|
||||
.env("INFLUXDB_IOX_CATALOG_DSN", &dsn)
|
||||
.env("INFLUXDB_IOX_OBJECT_STORE", "file")
|
||||
.env("INFLUXDB_IOX_DB_DIR", dir.path())
|
||||
.env("INFLUXDB_IOX_DB_DIR", object_store_dir.path())
|
||||
// add http/grpc address information
|
||||
.add_addr_env(server_type, addrs)
|
||||
.envs(env.iter().map(|(a, b)| (a.as_str(), b.as_str())))
|
||||
.envs(test_config.env())
|
||||
// redirect output to log file
|
||||
.stdout(stdout_log_file)
|
||||
.stderr(stderr_log_file)
|
||||
|
|
Loading…
Reference in New Issue