diff --git a/Cargo.lock b/Cargo.lock index d5f3f78dd5..67689faeba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6007,6 +6007,7 @@ dependencies = [ "http", "hyper", "influxdb_iox_client", + "nix", "once_cell", "rand", "reqwest", diff --git a/clap_blocks/src/catalog_dsn.rs b/clap_blocks/src/catalog_dsn.rs index b239090ac8..92631ac34f 100644 --- a/clap_blocks/src/catalog_dsn.rs +++ b/clap_blocks/src/catalog_dsn.rs @@ -38,6 +38,14 @@ pub struct CatalogDsnConfig { default_value = "10" )] pub max_catalog_connections: u32, + + /// Schema name for PostgreSQL-based catalogs. + #[clap( + long = "--catalog-postgres-schema-name", + env = "INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME", + default_value = iox_catalog::postgres::SCHEMA_NAME, + )] + pub postgres_schema_name: String, } #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, clap::ArgEnum)] @@ -56,7 +64,7 @@ impl CatalogDsnConfig { CatalogType::Postgres => Arc::new( PostgresCatalog::connect( app_name, - iox_catalog::postgres::SCHEMA_NAME, + &self.postgres_schema_name, self.dsn.as_ref().context(ConnectionStringRequiredSnafu)?, self.max_catalog_connections, metrics, diff --git a/test_helpers_end_to_end_ng/Cargo.toml b/test_helpers_end_to_end_ng/Cargo.toml index 51e22c2f3d..c70015b00b 100644 --- a/test_helpers_end_to_end_ng/Cargo.toml +++ b/test_helpers_end_to_end_ng/Cargo.toml @@ -16,6 +16,7 @@ assert_cmd = "2.0.2" futures = "0.3" http = "0.2.0" hyper = "0.14" +nix = "0.23" once_cell = { version = "1.10.0", features = ["parking_lot"] } rand = "0.8.3" reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } diff --git a/test_helpers_end_to_end_ng/src/config.rs b/test_helpers_end_to_end_ng/src/config.rs index 86b3fa6b4a..839b91646d 100644 --- a/test_helpers_end_to_end_ng/src/config.rs +++ b/test_helpers_end_to_end_ng/src/config.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, sync::Arc}; use http::{header::HeaderName, HeaderValue}; +use rand::Rng; use tempfile::TempDir; use crate::addrs::BindAddresses; @@ -22,6 +23,9 @@ pub struct TestConfig { /// Catalog DSN value dsn: String, + /// Catalog schema name + catalog_schema_name: String, + /// Write buffer directory, if needed write_buffer_dir: Option>, @@ -35,12 +39,17 @@ pub struct TestConfig { impl TestConfig { /// Create a new TestConfig (tests should use one of the specific /// configuration setup below, such as [new_router2] - fn new(server_type: ServerType, dsn: impl Into) -> Self { + fn new( + server_type: ServerType, + dsn: impl Into, + catalog_schema_name: impl Into, + ) -> Self { Self { env: HashMap::new(), client_headers: vec![], server_type, dsn: dsn.into(), + catalog_schema_name: catalog_schema_name.into(), write_buffer_dir: None, object_store_dir: None, addrs: Arc::new(BindAddresses::default()), @@ -49,7 +58,7 @@ impl TestConfig { /// Create a minimal router2 configuration pub fn new_router2(dsn: impl Into) -> Self { - Self::new(ServerType::Router2, dsn) + Self::new(ServerType::Router2, dsn, random_catalog_schema_name()) .with_new_write_buffer() .with_new_object_store() } @@ -57,10 +66,14 @@ impl TestConfig { /// Create a minimal ingester configuration, using the dsn and /// write buffer configuration from other pub fn new_ingester(other: &TestConfig) -> Self { - Self::new(ServerType::Ingester, other.dsn()) - .with_existing_write_buffer(other) - .with_existing_object_store(other) - .with_default_ingester_options() + Self::new( + ServerType::Ingester, + other.dsn(), + other.catalog_schema_name(), + ) + .with_existing_write_buffer(other) + .with_existing_object_store(other) + .with_default_ingester_options() } /// Create a minimal querier configuration from the specified @@ -80,13 +93,17 @@ impl TestConfig { /// Create a minimal querier configuration from the specified /// ingester configuration, using the same dsn and object store pub fn new_querier_without_ingester(ingester_config: &TestConfig) -> Self { - Self::new(ServerType::Querier, ingester_config.dsn()) - .with_existing_object_store(ingester_config) + Self::new( + ServerType::Querier, + ingester_config.dsn(), + ingester_config.catalog_schema_name(), + ) + .with_existing_object_store(ingester_config) } /// Create a minimal all in one configuration pub fn new_all_in_one(dsn: impl Into) -> Self { - Self::new(ServerType::AllInOne, dsn) + Self::new(ServerType::AllInOne, dsn, random_catalog_schema_name()) .with_new_write_buffer() .with_new_object_store() .with_default_ingester_options() @@ -104,6 +121,11 @@ impl TestConfig { &self.dsn } + // Get the catalog postgres schema name + pub fn catalog_schema_name(&self) -> &str { + &self.catalog_schema_name + } + /// Adds default ingester options fn with_default_ingester_options(self) -> Self { self.with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "20") @@ -220,3 +242,14 @@ impl TestConfig { &self.addrs } } + +fn random_catalog_schema_name() -> String { + let mut rng = rand::thread_rng(); + + (&mut rng) + .sample_iter(rand::distributions::Alphanumeric) + .filter(|c| c.is_ascii_alphabetic()) + .take(20) + .map(char::from) + .collect::() +} diff --git a/test_helpers_end_to_end_ng/src/database.rs b/test_helpers_end_to_end_ng/src/database.rs index 67da55f584..50427aebfd 100644 --- a/test_helpers_end_to_end_ng/src/database.rs +++ b/test_helpers_end_to_end_ng/src/database.rs @@ -3,19 +3,18 @@ use assert_cmd::Command; use once_cell::sync::Lazy; use sqlx::{migrate::MigrateDatabase, Postgres}; -use std::sync::Mutex; +use std::{collections::BTreeSet, sync::Mutex}; // I really do want to block everything until the database is initialized... #[allow(clippy::await_holding_lock)] -#[allow(clippy::mutex_atomic)] -static DB_INITIALIZED: Lazy> = Lazy::new(|| Mutex::new(false)); +static DB_INITIALIZED: Lazy>> = Lazy::new(|| Mutex::new(BTreeSet::new())); /// Performs once-per-process database initialization, if necessary -pub async fn initialize_db(dsn: &str) { +pub async fn initialize_db(dsn: &str, schema_name: &str) { let mut init = DB_INITIALIZED.lock().expect("Mutex poisoned"); // already done - if *init { + if init.contains(schema_name) { return; } @@ -32,7 +31,8 @@ pub async fn initialize_db(dsn: &str) { .unwrap() .arg("catalog") .arg("setup") - .env("INFLUXDB_IOX_CATALOG_DSN", &dsn) + .env("INFLUXDB_IOX_CATALOG_DSN", dsn) + .env("INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME", schema_name) .ok() .unwrap(); @@ -43,9 +43,10 @@ pub async fn initialize_db(dsn: &str) { .arg("topic") .arg("update") .arg("iox-shared") - .env("INFLUXDB_IOX_CATALOG_DSN", &dsn) + .env("INFLUXDB_IOX_CATALOG_DSN", dsn) + .env("INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME", schema_name) .ok() .unwrap(); - *init = true; + init.insert(schema_name.into()); } diff --git a/test_helpers_end_to_end_ng/src/mini_cluster.rs b/test_helpers_end_to_end_ng/src/mini_cluster.rs index ed814f9c82..dc63c43c25 100644 --- a/test_helpers_end_to_end_ng/src/mini_cluster.rs +++ b/test_helpers_end_to_end_ng/src/mini_cluster.rs @@ -71,6 +71,19 @@ impl MiniCluster { .server() } + /// Restart ingester. + /// + /// This will break all currently connected clients! + pub async fn restart_ingester(&mut self) { + self.ingester = Some( + self.ingester + .take() + .expect("ingester not initialized") + .restart_server() + .await, + ) + } + /// Retrieve the underlying querier server, if set pub fn querier(&self) -> &TestServer { self.querier diff --git a/test_helpers_end_to_end_ng/src/server_fixture.rs b/test_helpers_end_to_end_ng/src/server_fixture.rs index 43c6fc4e58..e7eb3e3b4b 100644 --- a/test_helpers_end_to_end_ng/src/server_fixture.rs +++ b/test_helpers_end_to_end_ng/src/server_fixture.rs @@ -3,6 +3,7 @@ use futures::prelude::*; use influxdb_iox_client::connection::Connection; use std::{ fmt::Debug, + fs::OpenOptions, path::Path, process::{Child, Command}, str, @@ -104,7 +105,9 @@ impl TestServer { async fn new(test_config: TestConfig) -> Self { let ready = Mutex::new(ServerState::Started); - let server_process = Arc::new(Mutex::new(Self::create_server_process(&test_config).await)); + let server_process = Arc::new(Mutex::new( + Self::create_server_process(&test_config, None).await, + )); Self { ready, @@ -217,18 +220,35 @@ impl TestServer { async fn restart(&self) { let mut ready_guard = self.ready.lock().await; let mut server_process = self.server_process.lock().await; - server_process.child.kill().unwrap(); - server_process.child.wait().unwrap(); - *server_process = Self::create_server_process(&self.test_config).await; + kill_politely(&mut server_process.child, Duration::from_secs(5)); + *server_process = + Self::create_server_process(&self.test_config, Some(server_process.log_path.clone())) + .await; *ready_guard = ServerState::Started; } - async fn create_server_process(test_config: &TestConfig) -> Process { + async fn create_server_process( + test_config: &TestConfig, + log_path: Option>, + ) -> Process { // Create a new file each time and keep it around to aid debugging - let (log_file, log_path) = NamedTempFile::new() - .expect("opening log file") - .keep() - .expect("expected to keep"); + let (log_file, log_path) = match log_path { + Some(log_path) => ( + OpenOptions::new() + .read(true) + .append(true) + .open(&log_path) + .expect("log file should still be there"), + log_path, + ), + None => { + let (log_file, log_path) = NamedTempFile::new() + .expect("opening log file") + .keep() + .expect("expected to keep"); + (log_file, log_path.into_boxed_path()) + } + }; let stdout_log_file = log_file .try_clone() @@ -248,7 +268,8 @@ impl TestServer { let run_command = server_type.run_command(); let dsn = test_config.dsn(); - initialize_db(dsn).await; + let schema_name = test_config.catalog_schema_name(); + initialize_db(dsn, schema_name).await; // This will inherit environment from the test runner // in particular `LOG_FILTER` @@ -257,7 +278,8 @@ impl TestServer { .arg("run") .arg(run_command) .env("LOG_FILTER", log_filter) - .env("INFLUXDB_IOX_CATALOG_DSN", &dsn) + .env("INFLUXDB_IOX_CATALOG_DSN", dsn) + .env("INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME", schema_name) // add http/grpc address information .add_addr_env(server_type, test_config.addrs()) .envs(test_config.env()) @@ -267,10 +289,7 @@ impl TestServer { .spawn() .unwrap(); - Process { - child, - log_path: log_path.into_boxed_path(), - } + Process { child, log_path } } /// Polls the various services to ensure the server is @@ -472,13 +491,7 @@ impl Drop for TestServer { .try_lock() .expect("should be able to get a server process lock"); - if let Err(e) = server_lock.child.kill() { - println!("Error killing child: {}", e); - } - - if let Err(e) = server_lock.child.wait() { - println!("Error waiting on child exit: {}", e); - } + kill_politely(&mut server_lock.child, Duration::from_secs(1)); dump_log_to_stdout(self.test_config.server_type(), &server_lock.log_path); } @@ -529,3 +542,65 @@ fn dump_log_to_stdout(server_type: ServerType, log_path: &Path) { println!("End {:?} TestServer Output", server_type); println!("****************"); } + +/// Attempt to kill a child process politely. +fn kill_politely(child: &mut Child, wait: Duration) { + use nix::{ + sys::{ + signal::{self, Signal}, + wait::waitpid, + }, + unistd::Pid, + }; + + let pid = Pid::from_raw(child.id().try_into().unwrap()); + + // try to be polite + let wait_errored = match signal::kill(pid, Signal::SIGTERM) { + Ok(()) => wait_timeout(pid, wait).is_err(), + Err(e) => { + println!("Error sending SIGTERM to child: {e}"); + true + } + }; + + if wait_errored { + // timeout => kill it + println!("Cannot terminate child politely, using SIGKILL..."); + + if let Err(e) = signal::kill(pid, Signal::SIGKILL) { + println!("Error sending SIGKILL to child: {e}"); + } + if let Err(e) = waitpid(pid, None) { + println!("Cannot wait for child: {e}"); + } + } else { + println!("Killed child politely"); + } +} + +/// Wait for given PID to exit with a timeout. +fn wait_timeout(pid: nix::unistd::Pid, timeout: Duration) -> Result<(), ()> { + use nix::sys::wait::waitpid; + + // use some thread and channel trickery, see https://stackoverflow.com/a/42720480 + let (sender, receiver) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let waitpid_res = waitpid(pid, None).map(|_| ()); + + // errors if the receiver side is gone which is OK. + sender.send(waitpid_res).ok(); + }); + + match receiver.recv_timeout(timeout) { + Ok(Ok(())) => Ok(()), + Ok(Err(e)) => { + println!("Cannot wait for child: {e}"); + Err(()) + } + Err(_) => { + println!("Timeout waiting for child"); + Err(()) + } + } +} diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 86d6940d92..c2423ea051 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -35,7 +35,7 @@ hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features hyper = { version = "0.14", features = ["client", "full", "h2", "http1", "http2", "runtime", "server", "socket2", "stream", "tcp"] } hyper-rustls = { version = "0.23", features = ["http1", "http2", "log", "logging", "native-tokio", "rustls-native-certs", "tls12", "tokio-runtime", "webpki-roots", "webpki-tokio"] } indexmap = { version = "1", default-features = false, features = ["std"] } -libc = { version = "0.2", features = ["std"] } +libc = { version = "0.2", features = ["extra_traits", "std"] } log = { version = "0.4", default-features = false, features = ["std"] } memchr = { version = "2", features = ["std"] } nom = { version = "7", features = ["alloc", "std"] } @@ -93,7 +93,7 @@ generic-array = { version = "0.14", default-features = false, features = ["more_ getrandom = { version = "0.2", default-features = false, features = ["std"] } hashbrown-a6292c17cd707f01 = { package = "hashbrown", version = "0.11", features = ["ahash", "inline-more", "raw"] } indexmap = { version = "1", default-features = false, features = ["std"] } -libc = { version = "0.2", features = ["std"] } +libc = { version = "0.2", features = ["extra_traits", "std"] } log = { version = "0.4", default-features = false, features = ["std"] } memchr = { version = "2", features = ["std"] } nom = { version = "7", features = ["alloc", "std"] } @@ -116,26 +116,14 @@ tokio-stream = { version = "0.1", features = ["fs", "net", "time"] } uuid = { version = "0.8", features = ["getrandom", "std", "v4"] } [target.x86_64-unknown-linux-gnu.dependencies] -libc = { version = "0.2", default-features = false, features = ["extra_traits"] } tokio-rustls = { version = "0.23" } -[target.x86_64-unknown-linux-gnu.build-dependencies] -libc = { version = "0.2", default-features = false, features = ["extra_traits"] } - [target.x86_64-apple-darwin.dependencies] -libc = { version = "0.2", default-features = false, features = ["extra_traits"] } tokio-rustls = { version = "0.23" } -[target.x86_64-apple-darwin.build-dependencies] -libc = { version = "0.2", default-features = false, features = ["extra_traits"] } - [target.aarch64-apple-darwin.dependencies] -libc = { version = "0.2", default-features = false, features = ["extra_traits"] } tokio-rustls = { version = "0.23" } -[target.aarch64-apple-darwin.build-dependencies] -libc = { version = "0.2", default-features = false, features = ["extra_traits"] } - [target.x86_64-pc-windows-msvc.dependencies] scopeguard = { version = "1", features = ["use_std"] } tokio = { version = "1", default-features = false, features = ["winapi"] }