Merge pull request #4304 from influxdata/crepererum/ng_end2end_tests_schema_names
test: use random catalog schema name for NG end2end testspull/24376/head
commit
06118b50be
|
@ -38,6 +38,14 @@ pub struct CatalogDsnConfig {
|
|||
default_value = "10"
|
||||
)]
|
||||
pub max_catalog_connections: u32,
|
||||
|
||||
/// Schema name for PostgreSQL-based catalogs.
|
||||
#[clap(
|
||||
long = "--catalog-postgres-schema-name",
|
||||
env = "INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME",
|
||||
default_value = iox_catalog::postgres::SCHEMA_NAME,
|
||||
)]
|
||||
pub postgres_schema_name: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, clap::ArgEnum)]
|
||||
|
@ -56,7 +64,7 @@ impl CatalogDsnConfig {
|
|||
CatalogType::Postgres => Arc::new(
|
||||
PostgresCatalog::connect(
|
||||
app_name,
|
||||
iox_catalog::postgres::SCHEMA_NAME,
|
||||
&self.postgres_schema_name,
|
||||
self.dsn.as_ref().context(ConnectionStringRequiredSnafu)?,
|
||||
self.max_catalog_connections,
|
||||
metrics,
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use http::{header::HeaderName, HeaderValue};
|
||||
use rand::Rng;
|
||||
use tempfile::TempDir;
|
||||
|
||||
use crate::addrs::BindAddresses;
|
||||
|
@ -22,6 +23,9 @@ pub struct TestConfig {
|
|||
/// Catalog DSN value
|
||||
dsn: String,
|
||||
|
||||
/// Catalog schema name
|
||||
catalog_schema_name: String,
|
||||
|
||||
/// Write buffer directory, if needed
|
||||
write_buffer_dir: Option<Arc<TempDir>>,
|
||||
|
||||
|
@ -35,12 +39,17 @@ pub struct TestConfig {
|
|||
impl TestConfig {
|
||||
/// Create a new TestConfig (tests should use one of the specific
|
||||
/// configuration setup below, such as [new_router2]
|
||||
fn new(server_type: ServerType, dsn: impl Into<String>) -> Self {
|
||||
fn new(
|
||||
server_type: ServerType,
|
||||
dsn: impl Into<String>,
|
||||
catalog_schema_name: impl Into<String>,
|
||||
) -> Self {
|
||||
Self {
|
||||
env: HashMap::new(),
|
||||
client_headers: vec![],
|
||||
server_type,
|
||||
dsn: dsn.into(),
|
||||
catalog_schema_name: catalog_schema_name.into(),
|
||||
write_buffer_dir: None,
|
||||
object_store_dir: None,
|
||||
addrs: Arc::new(BindAddresses::default()),
|
||||
|
@ -49,7 +58,7 @@ impl TestConfig {
|
|||
|
||||
/// Create a minimal router2 configuration
|
||||
pub fn new_router2(dsn: impl Into<String>) -> Self {
|
||||
Self::new(ServerType::Router2, dsn)
|
||||
Self::new(ServerType::Router2, dsn, random_catalog_schema_name())
|
||||
.with_new_write_buffer()
|
||||
.with_new_object_store()
|
||||
}
|
||||
|
@ -57,10 +66,14 @@ impl TestConfig {
|
|||
/// Create a minimal ingester configuration, using the dsn and
|
||||
/// write buffer configuration from other
|
||||
pub fn new_ingester(other: &TestConfig) -> Self {
|
||||
Self::new(ServerType::Ingester, other.dsn())
|
||||
.with_existing_write_buffer(other)
|
||||
.with_existing_object_store(other)
|
||||
.with_default_ingester_options()
|
||||
Self::new(
|
||||
ServerType::Ingester,
|
||||
other.dsn(),
|
||||
other.catalog_schema_name(),
|
||||
)
|
||||
.with_existing_write_buffer(other)
|
||||
.with_existing_object_store(other)
|
||||
.with_default_ingester_options()
|
||||
}
|
||||
|
||||
/// Create a minimal querier configuration from the specified
|
||||
|
@ -80,13 +93,17 @@ impl TestConfig {
|
|||
/// Create a minimal querier configuration from the specified
|
||||
/// ingester configuration, using the same dsn and object store
|
||||
pub fn new_querier_without_ingester(ingester_config: &TestConfig) -> Self {
|
||||
Self::new(ServerType::Querier, ingester_config.dsn())
|
||||
.with_existing_object_store(ingester_config)
|
||||
Self::new(
|
||||
ServerType::Querier,
|
||||
ingester_config.dsn(),
|
||||
ingester_config.catalog_schema_name(),
|
||||
)
|
||||
.with_existing_object_store(ingester_config)
|
||||
}
|
||||
|
||||
/// Create a minimal all in one configuration
|
||||
pub fn new_all_in_one(dsn: impl Into<String>) -> Self {
|
||||
Self::new(ServerType::AllInOne, dsn)
|
||||
Self::new(ServerType::AllInOne, dsn, random_catalog_schema_name())
|
||||
.with_new_write_buffer()
|
||||
.with_new_object_store()
|
||||
.with_default_ingester_options()
|
||||
|
@ -104,6 +121,11 @@ impl TestConfig {
|
|||
&self.dsn
|
||||
}
|
||||
|
||||
// Get the catalog postgres schema name
|
||||
pub fn catalog_schema_name(&self) -> &str {
|
||||
&self.catalog_schema_name
|
||||
}
|
||||
|
||||
/// Adds default ingester options
|
||||
fn with_default_ingester_options(self) -> Self {
|
||||
self.with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "20")
|
||||
|
@ -220,3 +242,14 @@ impl TestConfig {
|
|||
&self.addrs
|
||||
}
|
||||
}
|
||||
|
||||
fn random_catalog_schema_name() -> String {
|
||||
let mut rng = rand::thread_rng();
|
||||
|
||||
(&mut rng)
|
||||
.sample_iter(rand::distributions::Alphanumeric)
|
||||
.filter(|c| c.is_ascii_alphabetic())
|
||||
.take(20)
|
||||
.map(char::from)
|
||||
.collect::<String>()
|
||||
}
|
||||
|
|
|
@ -3,19 +3,18 @@
|
|||
use assert_cmd::Command;
|
||||
use once_cell::sync::Lazy;
|
||||
use sqlx::{migrate::MigrateDatabase, Postgres};
|
||||
use std::sync::Mutex;
|
||||
use std::{collections::BTreeSet, sync::Mutex};
|
||||
|
||||
// I really do want to block everything until the database is initialized...
|
||||
#[allow(clippy::await_holding_lock)]
|
||||
#[allow(clippy::mutex_atomic)]
|
||||
static DB_INITIALIZED: Lazy<Mutex<bool>> = Lazy::new(|| Mutex::new(false));
|
||||
static DB_INITIALIZED: Lazy<Mutex<BTreeSet<String>>> = Lazy::new(|| Mutex::new(BTreeSet::new()));
|
||||
|
||||
/// Performs once-per-process database initialization, if necessary
|
||||
pub async fn initialize_db(dsn: &str) {
|
||||
pub async fn initialize_db(dsn: &str, schema_name: &str) {
|
||||
let mut init = DB_INITIALIZED.lock().expect("Mutex poisoned");
|
||||
|
||||
// already done
|
||||
if *init {
|
||||
if init.contains(schema_name) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -32,7 +31,8 @@ pub async fn initialize_db(dsn: &str) {
|
|||
.unwrap()
|
||||
.arg("catalog")
|
||||
.arg("setup")
|
||||
.env("INFLUXDB_IOX_CATALOG_DSN", &dsn)
|
||||
.env("INFLUXDB_IOX_CATALOG_DSN", dsn)
|
||||
.env("INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME", schema_name)
|
||||
.ok()
|
||||
.unwrap();
|
||||
|
||||
|
@ -43,9 +43,10 @@ pub async fn initialize_db(dsn: &str) {
|
|||
.arg("topic")
|
||||
.arg("update")
|
||||
.arg("iox-shared")
|
||||
.env("INFLUXDB_IOX_CATALOG_DSN", &dsn)
|
||||
.env("INFLUXDB_IOX_CATALOG_DSN", dsn)
|
||||
.env("INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME", schema_name)
|
||||
.ok()
|
||||
.unwrap();
|
||||
|
||||
*init = true;
|
||||
init.insert(schema_name.into());
|
||||
}
|
||||
|
|
|
@ -268,7 +268,8 @@ impl TestServer {
|
|||
let run_command = server_type.run_command();
|
||||
|
||||
let dsn = test_config.dsn();
|
||||
initialize_db(dsn).await;
|
||||
let schema_name = test_config.catalog_schema_name();
|
||||
initialize_db(dsn, schema_name).await;
|
||||
|
||||
// This will inherit environment from the test runner
|
||||
// in particular `LOG_FILTER`
|
||||
|
@ -277,7 +278,8 @@ impl TestServer {
|
|||
.arg("run")
|
||||
.arg(run_command)
|
||||
.env("LOG_FILTER", log_filter)
|
||||
.env("INFLUXDB_IOX_CATALOG_DSN", &dsn)
|
||||
.env("INFLUXDB_IOX_CATALOG_DSN", dsn)
|
||||
.env("INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME", schema_name)
|
||||
// add http/grpc address information
|
||||
.add_addr_env(server_type, test_config.addrs())
|
||||
.envs(test_config.env())
|
||||
|
|
Loading…
Reference in New Issue