From e222acbb483116a5be41f4ae5cc47c9da2e3500e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 24 Mar 2022 14:36:47 -0400 Subject: [PATCH] refactor: use typed `TestConfig` rather than environment variable names for NG end to end tests (#4126) * refactor: move environment variable mapping in end to end tests into TestConfig * fix: clippy Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../tests/end_to_end_ng_cases/all_in_one.rs | 19 +-- influxdb_iox/tests/end_to_end_ng_cases/ng.rs | 34 +---- test_helpers_end_to_end_ng/src/config.rs | 116 ++++++++++++++---- .../src/server_fixture.rs | 41 ++----- 4 files changed, 113 insertions(+), 97 deletions(-) diff --git a/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs b/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs index bd629406f9..4f53bce4b2 100644 --- a/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs +++ b/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs @@ -1,18 +1,14 @@ use arrow_util::assert_batches_sorted_eq; use http::StatusCode; -use tempfile::TempDir; use test_helpers_end_to_end_ng::{ maybe_skip_integration, query_until_results, rand_name, write_to_router, ServerFixture, - ServerType, TestConfig, + TestConfig, }; #[tokio::test] async fn smoke() { let database_url = maybe_skip_integration!(); - let write_buffer_dir = TempDir::new().unwrap(); - let write_buffer_string = write_buffer_dir.path().display().to_string(); - let n_sequencers = "1"; let org = rand_name(); let bucket = rand_name(); let namespace = format!("{}_{}", org, bucket); @@ -20,18 +16,9 @@ async fn smoke() { // Set up all_in_one ==================================== - let test_config = TestConfig::new(ServerType::AllInOne) - .with_postgres_catalog(&database_url) - .with_env("INFLUXDB_IOX_WRITE_BUFFER_TYPE", "file") - .with_env("INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS", n_sequencers) - .with_env("INFLUXDB_IOX_WRITE_BUFFER_ADDR", &write_buffer_string) - .with_env("INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_START", "0") - .with_env("INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_END", "0") - // Aggressive expulsion of parquet files - .with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "2") - .with_env("INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES", "1"); + let test_config = TestConfig::new_all_in_one(database_url); - let all_in_one = ServerFixture::create_single_use_with_config(test_config).await; + let all_in_one = ServerFixture::create(test_config).await; // Write some data into the v2 HTTP API ============== let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name); diff --git a/influxdb_iox/tests/end_to_end_ng_cases/ng.rs b/influxdb_iox/tests/end_to_end_ng_cases/ng.rs index 50d4f22d00..e0f0a7f56a 100644 --- a/influxdb_iox/tests/end_to_end_ng_cases/ng.rs +++ b/influxdb_iox/tests/end_to_end_ng_cases/ng.rs @@ -1,19 +1,15 @@ use http::StatusCode; use test_helpers_end_to_end_ng::{ - maybe_skip_integration, rand_name, write_to_router, ServerFixture, ServerType, TestConfig, + maybe_skip_integration, rand_name, write_to_router, ServerFixture, TestConfig, }; use arrow_util::assert_batches_sorted_eq; use data_types2::{IngesterQueryRequest, SequencerId}; -use tempfile::TempDir; #[tokio::test] async fn router2_through_ingester() { let database_url = maybe_skip_integration!(); - let write_buffer_dir = TempDir::new().unwrap(); - let write_buffer_string = write_buffer_dir.path().display().to_string(); - let n_sequencers = 1; let sequencer_id = SequencerId::new(1); let org = rand_name(); let bucket = rand_name(); @@ -22,15 +18,10 @@ async fn router2_through_ingester() { // Set up router2 ==================================== - let test_config = TestConfig::new(ServerType::Router2) - .with_postgres_catalog(&database_url) - .with_env("INFLUXDB_IOX_WRITE_BUFFER_TYPE", "file") - .with_env( - "INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS", - n_sequencers.to_string(), - ) - .with_env("INFLUXDB_IOX_WRITE_BUFFER_ADDR", &write_buffer_string); - let router2 = ServerFixture::create_single_use_with_config(test_config).await; + let router2_config = TestConfig::new_router2(&database_url); + let ingester_config = TestConfig::new_ingester(&router2_config); + + let router2 = ServerFixture::create(router2_config).await; // Write some data into the v2 HTTP API ============== let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name); @@ -40,20 +31,7 @@ async fn router2_through_ingester() { assert_eq!(response.status(), StatusCode::NO_CONTENT); // Set up ingester =================================== - - let test_config = TestConfig::new(ServerType::Ingester) - .with_postgres_catalog(&database_url) - .with_env("INFLUXDB_IOX_WRITE_BUFFER_TYPE", "file") - .with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "20") - .with_env("INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES", "10") - .with_env("INFLUXDB_IOX_WRITE_BUFFER_ADDR", &write_buffer_string) - .with_env("INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_START", "0") - .with_env("INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_END", "0") - .with_env( - "INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS", - n_sequencers.to_string(), - ); - let ingester = ServerFixture::create_single_use_with_config(test_config).await; + let ingester = ServerFixture::create(ingester_config).await; let mut querier_flight = querier::flight::Client::new(ingester.server().ingester_grpc_connection()); diff --git a/test_helpers_end_to_end_ng/src/config.rs b/test_helpers_end_to_end_ng/src/config.rs index 76688fe29f..c5811eda93 100644 --- a/test_helpers_end_to_end_ng/src/config.rs +++ b/test_helpers_end_to_end_ng/src/config.rs @@ -1,12 +1,15 @@ +use std::{collections::HashMap, sync::Arc}; + use http::{header::HeaderName, HeaderValue}; +use tempfile::TempDir; use super::ServerType; /// Options for creating test servers (`influxdb_iox` processes) #[derive(Debug, Clone)] pub struct TestConfig { - /// Additional environment variables to pass - env: Vec<(String, String)>, + /// environment variables to pass to server process. HashMap to avoid duplication + env: HashMap, /// Headers to add to all client requests client_headers: Vec<(HeaderName, HeaderValue)>, @@ -15,45 +18,113 @@ pub struct TestConfig { server_type: ServerType, /// Catalog DSN value - dsn: Option, + dsn: String, + + /// Write buffer directory, if needed + write_buffer_dir: Option>, } impl TestConfig { - pub fn new(server_type: ServerType) -> Self { + /// Create a new TestConfig (tests should use one of the specific + /// configuration setup below, such as [new_router2] + fn new(server_type: ServerType, dsn: impl Into) -> Self { Self { - env: vec![], + env: HashMap::new(), client_headers: vec![], server_type, - dsn: None, + dsn: dsn.into(), + write_buffer_dir: None, } } - // change server type - pub fn with_server_type(mut self, server_type: ServerType) -> Self { - self.server_type = server_type; - self + /// Create a minimal router2 configuration + pub fn new_router2(dsn: impl Into) -> Self { + Self::new(ServerType::Router2, dsn).with_new_write_buffer() } - /// Set Postgres catalog DSN URL - pub fn with_postgres_catalog(mut self, dsn: &str) -> Self { - self.dsn = Some(dsn.into()); + /// Create a minimal ingester configuration, using the dsn and + /// write buffer configuration from other + pub fn new_ingester(other: &TestConfig) -> Self { + Self::new(ServerType::Ingester, other.dsn()) + .with_existing_write_buffer(other) + .with_default_ingester_options() + } - self + /// Create a minimal all in one configuration + pub fn new_all_in_one(dsn: impl Into) -> Self { + Self::new(ServerType::AllInOne, dsn) + .with_new_write_buffer() + .with_default_ingester_options() + // Aggressive expulsion of parquet files + .with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "2") + .with_env("INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES", "1") } // Get the catalog DSN URL and panic if it's not set pub fn dsn(&self) -> &str { - self.dsn - .as_ref() - .expect("Test Config must have a catalog configured") + &self.dsn } - // add a name=value environment variable when starting the server - pub fn with_env(mut self, name: impl Into, value: impl Into) -> Self { - self.env.push((name.into(), value.into())); + /// Adds default ingester options + fn with_default_ingester_options(self) -> Self { + self.with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "20") + .with_env("INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES", "10") + .with_env("INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_START", "0") + .with_env("INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_END", "0") + } + + /// add a name=value environment variable when starting the server + /// + /// Should not be called directly, but instead all mapping to + /// environment variables should be done via this structure + fn with_env(mut self, name: impl Into, value: impl Into) -> Self { + self.env.insert(name.into(), value.into()); self } + /// copy the specified environment variables from other; Panic's if they do not exist. + /// + /// Should not be called directly, but instead all mapping to + /// environment variables should be done via this structure + fn copy_env(self, name: impl Into, other: &TestConfig) -> Self { + let name = name.into(); + let value = match other.env.get(&name) { + Some(v) => v.clone(), + None => panic!( + "Can not copy {} from existing config. Available values are: {:#?}", + name, other.env + ), + }; + + self.with_env(name, value) + } + + /// Configures a new write buffer + pub fn with_new_write_buffer(mut self) -> Self { + let n_sequencers = 1; + let tmpdir = TempDir::new().expect("can not create tmp dir"); + let write_buffer_string = tmpdir.path().display().to_string(); + self.write_buffer_dir = Some(Arc::new(tmpdir)); + + self.with_env("INFLUXDB_IOX_WRITE_BUFFER_TYPE", "file") + .with_env( + "INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS", + n_sequencers.to_string(), + ) + .with_env("INFLUXDB_IOX_WRITE_BUFFER_ADDR", &write_buffer_string) + } + + /// Configures this TestConfig to use the same write buffer as other + pub fn with_existing_write_buffer(mut self, other: &TestConfig) -> Self { + // get the directory, if any + self.write_buffer_dir = other.write_buffer_dir.clone(); + + // copy the environment variables + self.copy_env("INFLUXDB_IOX_WRITE_BUFFER_TYPE", other) + .copy_env("INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS", other) + .copy_env("INFLUXDB_IOX_WRITE_BUFFER_ADDR", other) + } + // add a name=value http header to all client requests made to the server pub fn with_client_header(mut self, name: impl AsRef, value: impl AsRef) -> Self { self.client_headers.push(( @@ -70,9 +141,8 @@ impl TestConfig { } /// Get a reference to the test config's env. - #[must_use] - pub fn env(&self) -> &[(String, String)] { - self.env.as_ref() + pub fn env(&self) -> impl Iterator { + self.env.iter().map(|(k, v)| (k.as_str(), v.as_str())) } /// Get a reference to the test config's client headers. diff --git a/test_helpers_end_to_end_ng/src/server_fixture.rs b/test_helpers_end_to_end_ng/src/server_fixture.rs index 33ec30777d..f6597ae0d2 100644 --- a/test_helpers_end_to_end_ng/src/server_fixture.rs +++ b/test_helpers_end_to_end_ng/src/server_fixture.rs @@ -25,14 +25,7 @@ impl ServerFixture { /// Create a new server fixture and wait for it to be ready. This /// is called "create" rather than new because it is async and /// waits. The server is not shared with any other tests. - pub async fn create_single_use(server_type: ServerType) -> Self { - let test_config = TestConfig::new(server_type); - Self::create_single_use_with_config(test_config).await - } - - /// Create a new server fixture with the provided additional environment variables - /// and wait for it to be ready. The server is not shared with any other tests. - pub async fn create_single_use_with_config(test_config: TestConfig) -> Self { + pub async fn create(test_config: TestConfig) -> Self { let mut server = TestServer::new(test_config).await; // ensure the server is ready @@ -123,14 +116,7 @@ impl TestServer { let dir = test_helpers::tmp_dir().unwrap(); let server_process = Arc::new(Mutex::new( - Self::create_server_process( - &addrs, - &dir, - test_config.dsn(), - test_config.env(), - test_config.server_type(), - ) - .await, + Self::create_server_process(&addrs, &dir, &test_config).await, )); Self { @@ -243,23 +229,15 @@ impl TestServer { let mut server_process = self.server_process.lock().await; server_process.child.kill().unwrap(); server_process.child.wait().unwrap(); - *server_process = Self::create_server_process( - &self.addrs, - &self.dir, - self.test_config.dsn(), - self.test_config.env(), - self.test_config.server_type(), - ) - .await; + *server_process = + Self::create_server_process(&self.addrs, &self.dir, &self.test_config).await; *ready_guard = ServerState::Started; } async fn create_server_process( addrs: &BindAddresses, - dir: &TempDir, - dsn: &str, - env: &[(String, String)], - server_type: ServerType, + object_store_dir: &TempDir, + test_config: &TestConfig, ) -> Process { // Create a new file each time and keep it around to aid debugging let (log_file, log_path) = NamedTempFile::new() @@ -272,6 +250,8 @@ impl TestServer { .expect("cloning file handle for stdout"); let stderr_log_file = log_file; + let server_type = test_config.server_type(); + println!("****************"); println!("Server {:?} Logging to {:?}", server_type, log_path); println!("****************"); @@ -282,6 +262,7 @@ impl TestServer { let run_command = server_type.run_command(); + let dsn = test_config.dsn(); initialize_db(dsn).await; // This will inherit environment from the test runner @@ -293,10 +274,10 @@ impl TestServer { .env("LOG_FILTER", log_filter) .env("INFLUXDB_IOX_CATALOG_DSN", &dsn) .env("INFLUXDB_IOX_OBJECT_STORE", "file") - .env("INFLUXDB_IOX_DB_DIR", dir.path()) + .env("INFLUXDB_IOX_DB_DIR", object_store_dir.path()) // add http/grpc address information .add_addr_env(server_type, addrs) - .envs(env.iter().map(|(a, b)| (a.as_str(), b.as_str()))) + .envs(test_config.env()) // redirect output to log file .stdout(stdout_log_file) .stderr(stderr_log_file)