Merge branch 'main' into crepererum/kube_071

pull/24376/head
kodiakhq[bot] 2022-04-13 16:05:20 +00:00 committed by GitHub
commit 58763bd575
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 174 additions and 54 deletions

1
Cargo.lock generated
View File

@ -6007,6 +6007,7 @@ dependencies = [
"http", "http",
"hyper", "hyper",
"influxdb_iox_client", "influxdb_iox_client",
"nix",
"once_cell", "once_cell",
"rand", "rand",
"reqwest", "reqwest",

View File

@ -38,6 +38,14 @@ pub struct CatalogDsnConfig {
default_value = "10" default_value = "10"
)] )]
pub max_catalog_connections: u32, pub max_catalog_connections: u32,
/// Schema name for PostgreSQL-based catalogs.
#[clap(
long = "--catalog-postgres-schema-name",
env = "INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME",
default_value = iox_catalog::postgres::SCHEMA_NAME,
)]
pub postgres_schema_name: String,
} }
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, clap::ArgEnum)] #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, clap::ArgEnum)]
@ -56,7 +64,7 @@ impl CatalogDsnConfig {
CatalogType::Postgres => Arc::new( CatalogType::Postgres => Arc::new(
PostgresCatalog::connect( PostgresCatalog::connect(
app_name, app_name,
iox_catalog::postgres::SCHEMA_NAME, &self.postgres_schema_name,
self.dsn.as_ref().context(ConnectionStringRequiredSnafu)?, self.dsn.as_ref().context(ConnectionStringRequiredSnafu)?,
self.max_catalog_connections, self.max_catalog_connections,
metrics, metrics,

View File

@ -16,6 +16,7 @@ assert_cmd = "2.0.2"
futures = "0.3" futures = "0.3"
http = "0.2.0" http = "0.2.0"
hyper = "0.14" hyper = "0.14"
nix = "0.23"
once_cell = { version = "1.10.0", features = ["parking_lot"] } once_cell = { version = "1.10.0", features = ["parking_lot"] }
rand = "0.8.3" rand = "0.8.3"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }

View File

@ -1,6 +1,7 @@
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use http::{header::HeaderName, HeaderValue}; use http::{header::HeaderName, HeaderValue};
use rand::Rng;
use tempfile::TempDir; use tempfile::TempDir;
use crate::addrs::BindAddresses; use crate::addrs::BindAddresses;
@ -22,6 +23,9 @@ pub struct TestConfig {
/// Catalog DSN value /// Catalog DSN value
dsn: String, dsn: String,
/// Catalog schema name
catalog_schema_name: String,
/// Write buffer directory, if needed /// Write buffer directory, if needed
write_buffer_dir: Option<Arc<TempDir>>, write_buffer_dir: Option<Arc<TempDir>>,
@ -35,12 +39,17 @@ pub struct TestConfig {
impl TestConfig { impl TestConfig {
/// Create a new TestConfig (tests should use one of the specific /// Create a new TestConfig (tests should use one of the specific
/// configuration setup below, such as [new_router2] /// configuration setup below, such as [new_router2]
fn new(server_type: ServerType, dsn: impl Into<String>) -> Self { fn new(
server_type: ServerType,
dsn: impl Into<String>,
catalog_schema_name: impl Into<String>,
) -> Self {
Self { Self {
env: HashMap::new(), env: HashMap::new(),
client_headers: vec![], client_headers: vec![],
server_type, server_type,
dsn: dsn.into(), dsn: dsn.into(),
catalog_schema_name: catalog_schema_name.into(),
write_buffer_dir: None, write_buffer_dir: None,
object_store_dir: None, object_store_dir: None,
addrs: Arc::new(BindAddresses::default()), addrs: Arc::new(BindAddresses::default()),
@ -49,7 +58,7 @@ impl TestConfig {
/// Create a minimal router2 configuration /// Create a minimal router2 configuration
pub fn new_router2(dsn: impl Into<String>) -> Self { pub fn new_router2(dsn: impl Into<String>) -> Self {
Self::new(ServerType::Router2, dsn) Self::new(ServerType::Router2, dsn, random_catalog_schema_name())
.with_new_write_buffer() .with_new_write_buffer()
.with_new_object_store() .with_new_object_store()
} }
@ -57,10 +66,14 @@ impl TestConfig {
/// Create a minimal ingester configuration, using the dsn and /// Create a minimal ingester configuration, using the dsn and
/// write buffer configuration from other /// write buffer configuration from other
pub fn new_ingester(other: &TestConfig) -> Self { pub fn new_ingester(other: &TestConfig) -> Self {
Self::new(ServerType::Ingester, other.dsn()) Self::new(
.with_existing_write_buffer(other) ServerType::Ingester,
.with_existing_object_store(other) other.dsn(),
.with_default_ingester_options() other.catalog_schema_name(),
)
.with_existing_write_buffer(other)
.with_existing_object_store(other)
.with_default_ingester_options()
} }
/// Create a minimal querier configuration from the specified /// Create a minimal querier configuration from the specified
@ -80,13 +93,17 @@ impl TestConfig {
/// Create a minimal querier configuration from the specified /// Create a minimal querier configuration from the specified
/// ingester configuration, using the same dsn and object store /// ingester configuration, using the same dsn and object store
pub fn new_querier_without_ingester(ingester_config: &TestConfig) -> Self { pub fn new_querier_without_ingester(ingester_config: &TestConfig) -> Self {
Self::new(ServerType::Querier, ingester_config.dsn()) Self::new(
.with_existing_object_store(ingester_config) ServerType::Querier,
ingester_config.dsn(),
ingester_config.catalog_schema_name(),
)
.with_existing_object_store(ingester_config)
} }
/// Create a minimal all in one configuration /// Create a minimal all in one configuration
pub fn new_all_in_one(dsn: impl Into<String>) -> Self { pub fn new_all_in_one(dsn: impl Into<String>) -> Self {
Self::new(ServerType::AllInOne, dsn) Self::new(ServerType::AllInOne, dsn, random_catalog_schema_name())
.with_new_write_buffer() .with_new_write_buffer()
.with_new_object_store() .with_new_object_store()
.with_default_ingester_options() .with_default_ingester_options()
@ -104,6 +121,11 @@ impl TestConfig {
&self.dsn &self.dsn
} }
// Get the catalog postgres schema name
pub fn catalog_schema_name(&self) -> &str {
&self.catalog_schema_name
}
/// Adds default ingester options /// Adds default ingester options
fn with_default_ingester_options(self) -> Self { fn with_default_ingester_options(self) -> Self {
self.with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "20") self.with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "20")
@ -220,3 +242,14 @@ impl TestConfig {
&self.addrs &self.addrs
} }
} }
fn random_catalog_schema_name() -> String {
let mut rng = rand::thread_rng();
(&mut rng)
.sample_iter(rand::distributions::Alphanumeric)
.filter(|c| c.is_ascii_alphabetic())
.take(20)
.map(char::from)
.collect::<String>()
}

View File

@ -3,19 +3,18 @@
use assert_cmd::Command; use assert_cmd::Command;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use sqlx::{migrate::MigrateDatabase, Postgres}; use sqlx::{migrate::MigrateDatabase, Postgres};
use std::sync::Mutex; use std::{collections::BTreeSet, sync::Mutex};
// I really do want to block everything until the database is initialized... // I really do want to block everything until the database is initialized...
#[allow(clippy::await_holding_lock)] #[allow(clippy::await_holding_lock)]
#[allow(clippy::mutex_atomic)] static DB_INITIALIZED: Lazy<Mutex<BTreeSet<String>>> = Lazy::new(|| Mutex::new(BTreeSet::new()));
static DB_INITIALIZED: Lazy<Mutex<bool>> = Lazy::new(|| Mutex::new(false));
/// Performs once-per-process database initialization, if necessary /// Performs once-per-process database initialization, if necessary
pub async fn initialize_db(dsn: &str) { pub async fn initialize_db(dsn: &str, schema_name: &str) {
let mut init = DB_INITIALIZED.lock().expect("Mutex poisoned"); let mut init = DB_INITIALIZED.lock().expect("Mutex poisoned");
// already done // already done
if *init { if init.contains(schema_name) {
return; return;
} }
@ -32,7 +31,8 @@ pub async fn initialize_db(dsn: &str) {
.unwrap() .unwrap()
.arg("catalog") .arg("catalog")
.arg("setup") .arg("setup")
.env("INFLUXDB_IOX_CATALOG_DSN", &dsn) .env("INFLUXDB_IOX_CATALOG_DSN", dsn)
.env("INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME", schema_name)
.ok() .ok()
.unwrap(); .unwrap();
@ -43,9 +43,10 @@ pub async fn initialize_db(dsn: &str) {
.arg("topic") .arg("topic")
.arg("update") .arg("update")
.arg("iox-shared") .arg("iox-shared")
.env("INFLUXDB_IOX_CATALOG_DSN", &dsn) .env("INFLUXDB_IOX_CATALOG_DSN", dsn)
.env("INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME", schema_name)
.ok() .ok()
.unwrap(); .unwrap();
*init = true; init.insert(schema_name.into());
} }

View File

@ -71,6 +71,19 @@ impl MiniCluster {
.server() .server()
} }
/// Restart ingester.
///
/// This will break all currently connected clients!
pub async fn restart_ingester(&mut self) {
self.ingester = Some(
self.ingester
.take()
.expect("ingester not initialized")
.restart_server()
.await,
)
}
/// Retrieve the underlying querier server, if set /// Retrieve the underlying querier server, if set
pub fn querier(&self) -> &TestServer { pub fn querier(&self) -> &TestServer {
self.querier self.querier

View File

@ -3,6 +3,7 @@ use futures::prelude::*;
use influxdb_iox_client::connection::Connection; use influxdb_iox_client::connection::Connection;
use std::{ use std::{
fmt::Debug, fmt::Debug,
fs::OpenOptions,
path::Path, path::Path,
process::{Child, Command}, process::{Child, Command},
str, str,
@ -104,7 +105,9 @@ impl TestServer {
async fn new(test_config: TestConfig) -> Self { async fn new(test_config: TestConfig) -> Self {
let ready = Mutex::new(ServerState::Started); let ready = Mutex::new(ServerState::Started);
let server_process = Arc::new(Mutex::new(Self::create_server_process(&test_config).await)); let server_process = Arc::new(Mutex::new(
Self::create_server_process(&test_config, None).await,
));
Self { Self {
ready, ready,
@ -217,18 +220,35 @@ impl TestServer {
async fn restart(&self) { async fn restart(&self) {
let mut ready_guard = self.ready.lock().await; let mut ready_guard = self.ready.lock().await;
let mut server_process = self.server_process.lock().await; let mut server_process = self.server_process.lock().await;
server_process.child.kill().unwrap(); kill_politely(&mut server_process.child, Duration::from_secs(5));
server_process.child.wait().unwrap(); *server_process =
*server_process = Self::create_server_process(&self.test_config).await; Self::create_server_process(&self.test_config, Some(server_process.log_path.clone()))
.await;
*ready_guard = ServerState::Started; *ready_guard = ServerState::Started;
} }
async fn create_server_process(test_config: &TestConfig) -> Process { async fn create_server_process(
test_config: &TestConfig,
log_path: Option<Box<Path>>,
) -> Process {
// Create a new file each time and keep it around to aid debugging // Create a new file each time and keep it around to aid debugging
let (log_file, log_path) = NamedTempFile::new() let (log_file, log_path) = match log_path {
.expect("opening log file") Some(log_path) => (
.keep() OpenOptions::new()
.expect("expected to keep"); .read(true)
.append(true)
.open(&log_path)
.expect("log file should still be there"),
log_path,
),
None => {
let (log_file, log_path) = NamedTempFile::new()
.expect("opening log file")
.keep()
.expect("expected to keep");
(log_file, log_path.into_boxed_path())
}
};
let stdout_log_file = log_file let stdout_log_file = log_file
.try_clone() .try_clone()
@ -248,7 +268,8 @@ impl TestServer {
let run_command = server_type.run_command(); let run_command = server_type.run_command();
let dsn = test_config.dsn(); let dsn = test_config.dsn();
initialize_db(dsn).await; let schema_name = test_config.catalog_schema_name();
initialize_db(dsn, schema_name).await;
// This will inherit environment from the test runner // This will inherit environment from the test runner
// in particular `LOG_FILTER` // in particular `LOG_FILTER`
@ -257,7 +278,8 @@ impl TestServer {
.arg("run") .arg("run")
.arg(run_command) .arg(run_command)
.env("LOG_FILTER", log_filter) .env("LOG_FILTER", log_filter)
.env("INFLUXDB_IOX_CATALOG_DSN", &dsn) .env("INFLUXDB_IOX_CATALOG_DSN", dsn)
.env("INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME", schema_name)
// add http/grpc address information // add http/grpc address information
.add_addr_env(server_type, test_config.addrs()) .add_addr_env(server_type, test_config.addrs())
.envs(test_config.env()) .envs(test_config.env())
@ -267,10 +289,7 @@ impl TestServer {
.spawn() .spawn()
.unwrap(); .unwrap();
Process { Process { child, log_path }
child,
log_path: log_path.into_boxed_path(),
}
} }
/// Polls the various services to ensure the server is /// Polls the various services to ensure the server is
@ -472,13 +491,7 @@ impl Drop for TestServer {
.try_lock() .try_lock()
.expect("should be able to get a server process lock"); .expect("should be able to get a server process lock");
if let Err(e) = server_lock.child.kill() { kill_politely(&mut server_lock.child, Duration::from_secs(1));
println!("Error killing child: {}", e);
}
if let Err(e) = server_lock.child.wait() {
println!("Error waiting on child exit: {}", e);
}
dump_log_to_stdout(self.test_config.server_type(), &server_lock.log_path); dump_log_to_stdout(self.test_config.server_type(), &server_lock.log_path);
} }
@ -529,3 +542,65 @@ fn dump_log_to_stdout(server_type: ServerType, log_path: &Path) {
println!("End {:?} TestServer Output", server_type); println!("End {:?} TestServer Output", server_type);
println!("****************"); println!("****************");
} }
/// Attempt to kill a child process politely.
fn kill_politely(child: &mut Child, wait: Duration) {
use nix::{
sys::{
signal::{self, Signal},
wait::waitpid,
},
unistd::Pid,
};
let pid = Pid::from_raw(child.id().try_into().unwrap());
// try to be polite
let wait_errored = match signal::kill(pid, Signal::SIGTERM) {
Ok(()) => wait_timeout(pid, wait).is_err(),
Err(e) => {
println!("Error sending SIGTERM to child: {e}");
true
}
};
if wait_errored {
// timeout => kill it
println!("Cannot terminate child politely, using SIGKILL...");
if let Err(e) = signal::kill(pid, Signal::SIGKILL) {
println!("Error sending SIGKILL to child: {e}");
}
if let Err(e) = waitpid(pid, None) {
println!("Cannot wait for child: {e}");
}
} else {
println!("Killed child politely");
}
}
/// Wait for given PID to exit with a timeout.
fn wait_timeout(pid: nix::unistd::Pid, timeout: Duration) -> Result<(), ()> {
use nix::sys::wait::waitpid;
// use some thread and channel trickery, see https://stackoverflow.com/a/42720480
let (sender, receiver) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let waitpid_res = waitpid(pid, None).map(|_| ());
// errors if the receiver side is gone which is OK.
sender.send(waitpid_res).ok();
});
match receiver.recv_timeout(timeout) {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => {
println!("Cannot wait for child: {e}");
Err(())
}
Err(_) => {
println!("Timeout waiting for child");
Err(())
}
}
}

View File

@ -35,7 +35,7 @@ hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features
hyper = { version = "0.14", features = ["client", "full", "h2", "http1", "http2", "runtime", "server", "socket2", "stream", "tcp"] } hyper = { version = "0.14", features = ["client", "full", "h2", "http1", "http2", "runtime", "server", "socket2", "stream", "tcp"] }
hyper-rustls = { version = "0.23", features = ["http1", "http2", "log", "logging", "native-tokio", "rustls-native-certs", "tls12", "tokio-runtime", "webpki-roots", "webpki-tokio"] } hyper-rustls = { version = "0.23", features = ["http1", "http2", "log", "logging", "native-tokio", "rustls-native-certs", "tls12", "tokio-runtime", "webpki-roots", "webpki-tokio"] }
indexmap = { version = "1", default-features = false, features = ["std"] } indexmap = { version = "1", default-features = false, features = ["std"] }
libc = { version = "0.2", features = ["std"] } libc = { version = "0.2", features = ["extra_traits", "std"] }
log = { version = "0.4", default-features = false, features = ["std"] } log = { version = "0.4", default-features = false, features = ["std"] }
memchr = { version = "2", features = ["std"] } memchr = { version = "2", features = ["std"] }
nom = { version = "7", features = ["alloc", "std"] } nom = { version = "7", features = ["alloc", "std"] }
@ -93,7 +93,7 @@ generic-array = { version = "0.14", default-features = false, features = ["more_
getrandom = { version = "0.2", default-features = false, features = ["std"] } getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown-a6292c17cd707f01 = { package = "hashbrown", version = "0.11", features = ["ahash", "inline-more", "raw"] } hashbrown-a6292c17cd707f01 = { package = "hashbrown", version = "0.11", features = ["ahash", "inline-more", "raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] } indexmap = { version = "1", default-features = false, features = ["std"] }
libc = { version = "0.2", features = ["std"] } libc = { version = "0.2", features = ["extra_traits", "std"] }
log = { version = "0.4", default-features = false, features = ["std"] } log = { version = "0.4", default-features = false, features = ["std"] }
memchr = { version = "2", features = ["std"] } memchr = { version = "2", features = ["std"] }
nom = { version = "7", features = ["alloc", "std"] } nom = { version = "7", features = ["alloc", "std"] }
@ -116,26 +116,14 @@ tokio-stream = { version = "0.1", features = ["fs", "net", "time"] }
uuid = { version = "0.8", features = ["getrandom", "std", "v4"] } uuid = { version = "0.8", features = ["getrandom", "std", "v4"] }
[target.x86_64-unknown-linux-gnu.dependencies] [target.x86_64-unknown-linux-gnu.dependencies]
libc = { version = "0.2", default-features = false, features = ["extra_traits"] }
tokio-rustls = { version = "0.23" } tokio-rustls = { version = "0.23" }
[target.x86_64-unknown-linux-gnu.build-dependencies]
libc = { version = "0.2", default-features = false, features = ["extra_traits"] }
[target.x86_64-apple-darwin.dependencies] [target.x86_64-apple-darwin.dependencies]
libc = { version = "0.2", default-features = false, features = ["extra_traits"] }
tokio-rustls = { version = "0.23" } tokio-rustls = { version = "0.23" }
[target.x86_64-apple-darwin.build-dependencies]
libc = { version = "0.2", default-features = false, features = ["extra_traits"] }
[target.aarch64-apple-darwin.dependencies] [target.aarch64-apple-darwin.dependencies]
libc = { version = "0.2", default-features = false, features = ["extra_traits"] }
tokio-rustls = { version = "0.23" } tokio-rustls = { version = "0.23" }
[target.aarch64-apple-darwin.build-dependencies]
libc = { version = "0.2", default-features = false, features = ["extra_traits"] }
[target.x86_64-pc-windows-msvc.dependencies] [target.x86_64-pc-windows-msvc.dependencies]
scopeguard = { version = "1", features = ["use_std"] } scopeguard = { version = "1", features = ["use_std"] }
tokio = { version = "1", default-features = false, features = ["winapi"] } tokio = { version = "1", default-features = false, features = ["winapi"] }