chore: Merge remote-tracking branch 'origin/main' into smith/remove-transactions-main

pull/24376/head
Jeffrey Smith II 2023-05-11 13:16:15 -05:00
commit e73564ec0e
100 changed files with 325 additions and 310 deletions

114
Cargo.lock generated
View File

@ -276,7 +276,7 @@ dependencies = [
"paste",
"prost",
"tokio",
"tonic 0.9.2",
"tonic",
]
[[package]]
@ -520,7 +520,7 @@ dependencies = [
"http",
"observability_deps",
"snafu",
"tonic 0.9.2",
"tonic",
"workspace-hack",
]
@ -947,7 +947,7 @@ dependencies = [
"reqwest",
"thiserror",
"tokio",
"tonic 0.9.2",
"tonic",
"tower",
"workspace-hack",
]
@ -1085,21 +1085,21 @@ dependencies = [
[[package]]
name = "console-api"
version = "0.4.0"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e57ff02e8ad8e06ab9731d5dc72dc23bef9200778eae1a89d555d8c42e5d4a86"
checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e"
dependencies = [
"prost",
"prost-types",
"tonic 0.8.3",
"tonic",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.1.8"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22a3a81dfaf6b66bce5d159eddae701e3a002f194d378cbf7be5f053c281d9be"
checksum = "57ab2224a0311582eb03adba4caaf18644f7b1f10a760803a803b9b605187fc7"
dependencies = [
"console-api",
"crossbeam-channel",
@ -1107,14 +1107,14 @@ dependencies = [
"futures",
"hdrhistogram",
"humantime",
"parking_lot 0.11.2",
"parking_lot 0.12.1",
"prost-types",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic 0.8.3",
"tonic",
"tracing",
"tracing-core",
"tracing-subscriber",
@ -1862,7 +1862,7 @@ dependencies = [
"prost",
"snafu",
"tokio",
"tonic 0.9.2",
"tonic",
"workspace-hack",
]
@ -2049,7 +2049,7 @@ dependencies = [
"query_functions",
"serde",
"snafu",
"tonic 0.9.2",
"tonic",
"tonic-build",
"workspace-hack",
]
@ -2106,7 +2106,7 @@ dependencies = [
"prost-build",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
"tonic-build",
"tower",
"workspace-hack",
@ -2119,7 +2119,7 @@ dependencies = [
"prost",
"prost-build",
"prost-types",
"tonic 0.9.2",
"tonic",
"tonic-build",
"workspace-hack",
]
@ -2131,7 +2131,7 @@ dependencies = [
"prost",
"prost-build",
"prost-types",
"tonic 0.9.2",
"tonic",
"tonic-build",
"workspace-hack",
]
@ -2605,7 +2605,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.9.2",
"tonic",
"trace_exporters",
"trogging",
"uuid",
@ -2634,7 +2634,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
]
[[package]]
@ -2646,7 +2646,7 @@ dependencies = [
"generated_types",
"observability_deps",
"prost",
"tonic 0.9.2",
"tonic",
"workspace-hack",
]
@ -2725,7 +2725,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"tonic 0.9.2",
"tonic",
"trace",
"uuid",
"wal",
@ -2760,7 +2760,7 @@ dependencies = [
"test_helpers",
"tokio",
"tokio-util",
"tonic 0.9.2",
"tonic",
"wal",
"workspace-hack",
]
@ -3017,7 +3017,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.9.2",
"tonic",
"tonic-health",
"tonic-reflection",
"tower",
@ -3115,7 +3115,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"tonic 0.9.2",
"tonic",
"trace",
"workspace-hack",
]
@ -4454,7 +4454,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"tonic 0.9.2",
"tonic",
"trace",
"trace_exporters",
"trace_http",
@ -4757,7 +4757,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
"trace",
"workspace-hack",
]
@ -5000,7 +5000,7 @@ dependencies = [
"metric",
"parking_lot 0.12.1",
"predicate",
"tonic 0.9.2",
"tonic",
"trace",
"tracker",
"workspace-hack",
@ -5016,7 +5016,7 @@ dependencies = [
"metric",
"observability_deps",
"tokio",
"tonic 0.9.2",
"tonic",
"uuid",
"workspace-hack",
]
@ -5046,7 +5046,7 @@ dependencies = [
"service_common",
"snafu",
"tokio",
"tonic 0.9.2",
"tonic",
"trace",
"trace_http",
"tracker",
@ -5085,7 +5085,7 @@ dependencies = [
"test_helpers",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
"trace",
"trace_http",
"tracker",
@ -5105,7 +5105,7 @@ dependencies = [
"observability_deps",
"paste",
"tokio",
"tonic 0.9.2",
"tonic",
"workspace-hack",
]
@ -5123,7 +5123,7 @@ dependencies = [
"observability_deps",
"parquet_file",
"tokio",
"tonic 0.9.2",
"tonic",
"uuid",
"workspace-hack",
]
@ -5138,7 +5138,7 @@ dependencies = [
"metric",
"observability_deps",
"tokio",
"tonic 0.9.2",
"tonic",
"workspace-hack",
]
@ -5148,7 +5148,7 @@ version = "0.1.0"
dependencies = [
"generated_types",
"observability_deps",
"tonic 0.9.2",
"tonic",
"workspace-hack",
]
@ -5629,7 +5629,7 @@ dependencies = [
"test_helpers",
"tokio",
"tokio-util",
"tonic 0.9.2",
"tonic",
"workspace-hack",
]
@ -5868,38 +5868,6 @@ dependencies = [
"winnow",
]
[[package]]
name = "tonic"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64 0.13.1",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost-derive",
"tokio",
"tokio-stream",
"tokio-util",
"tower",
"tower-layer",
"tower-service",
"tracing",
"tracing-futures",
]
[[package]]
name = "tonic"
version = "0.9.2"
@ -5955,7 +5923,7 @@ dependencies = [
"prost",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
]
[[package]]
@ -5968,7 +5936,7 @@ dependencies = [
"prost-types",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
]
[[package]]
@ -6103,16 +6071,6 @@ dependencies = [
"valuable",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project",
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.1.3"
@ -6802,7 +6760,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.9.2",
"tonic",
"tower",
"tracing",
"tracing-core",

View File

@ -19,7 +19,7 @@ pub mod ingester;
pub mod ingester_address;
pub mod object_store;
pub mod querier;
pub mod router2;
pub mod router;
pub mod run_config;
pub mod single_tenant;
pub mod socket_addr;

View File

@ -14,7 +14,7 @@ use std::{
/// CLI config for the router using the RPC write path
#[derive(Debug, Clone, clap::Parser)]
#[allow(missing_copy_implementations)]
pub struct Router2Config {
pub struct RouterConfig {
/// Addr for connection to authz
#[clap(
long = CONFIG_AUTHZ_FLAG,

View File

@ -120,7 +120,7 @@ INFLUXDB_IOX_CATALOG_DSN=postgres://postgres@localhost:5432/postgres \
OBJECT_STORE=file \
DATABASE_DIRECTORY=~/data_dir \
LOG_FILTER=info \
./target/release/influxdb_iox run router2
./target/release/influxdb_iox run router
```
# Step 5: Ingest data

View File

@ -48,7 +48,7 @@ backtrace = "0.3"
bytes = "1.4"
clap = { version = "4", features = ["derive", "env"] }
comfy-table = { version = "6.1", default-features = false }
console-subscriber = { version = "0.1.8", optional = true, features = ["parking_lot"] }
console-subscriber = { version = "0.1.9", optional = true, features = ["parking_lot"] }
dotenvy = "0.15.7"
futures = "0.3"
futures-util = { version = "0.3" }

View File

@ -10,7 +10,7 @@ use clap_blocks::{
ingester_address::IngesterAddress,
object_store::{make_object_store, ObjectStoreConfig},
querier::QuerierConfig,
router2::Router2Config,
router::RouterConfig,
run_config::RunConfig,
single_tenant::{
CONFIG_AUTHZ_ENV_NAME, CONFIG_AUTHZ_FLAG, CONFIG_CST_ENV_NAME, CONFIG_CST_FLAG,
@ -27,7 +27,7 @@ use ioxd_common::{
use ioxd_compactor::create_compactor_server_type;
use ioxd_ingester::create_ingester_server_type;
use ioxd_querier::{create_querier_server_type, QuerierServerTypeArgs};
use ioxd_router::create_router2_server_type;
use ioxd_router::create_router_server_type;
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use parquet_file::storage::{ParquetStorage, StorageId};
@ -466,7 +466,7 @@ impl Config {
rpc_write_max_incoming_bytes: 1024 * 1024 * 1024, // 1GiB
};
let router_config = Router2Config {
let router_config = RouterConfig {
authz_address: authz_address.clone(),
single_tenant_deployment,
http_request_limit: 1_000,
@ -555,7 +555,7 @@ struct SpecializedConfig {
catalog_dsn: CatalogDsnConfig,
ingester_config: IngesterConfig,
router_config: Router2Config,
router_config: RouterConfig,
compactor_config: CompactorConfig,
querier_config: QuerierConfig,
}
@ -611,7 +611,7 @@ pub async fn command(config: Config) -> Result<()> {
}));
info!("starting router");
let router = create_router2_server_type(
let router = create_router_server_type(
&common_state,
Arc::clone(&metrics),
Arc::clone(&catalog),

View File

@ -7,7 +7,7 @@ mod garbage_collector;
mod ingester;
mod main;
mod querier;
mod router2;
mod router;
mod test;
#[derive(Debug, Snafu)]
@ -22,8 +22,8 @@ pub enum Error {
#[snafu(display("Error in querier subcommand: {}", source))]
QuerierError { source: querier::Error },
#[snafu(display("Error in router2 subcommand: {}", source))]
Router2Error { source: router2::Error },
#[snafu(display("Error in router subcommand: {}", source))]
RouterError { source: router::Error },
#[snafu(display("Error in ingester subcommand: {}", source))]
IngesterError { source: ingester::Error },
@ -54,7 +54,7 @@ impl Config {
Some(Command::Compactor(config)) => config.run_config.logging_config(),
Some(Command::GarbageCollector(config)) => config.run_config.logging_config(),
Some(Command::Querier(config)) => config.run_config.logging_config(),
Some(Command::Router2(config)) => config.run_config.logging_config(),
Some(Command::Router(config)) => config.run_config.logging_config(),
Some(Command::Ingester(config)) => config.run_config.logging_config(),
Some(Command::AllInOne(config)) => &config.logging_config,
Some(Command::Test(config)) => config.run_config.logging_config(),
@ -71,8 +71,9 @@ enum Command {
/// Run the server in querier mode
Querier(querier::Config),
/// Run the server in router2 mode
Router2(router2::Config),
/// Run the server in router mode
#[clap(alias = "router2")]
Router(router::Config),
/// Run the server in ingester mode
#[clap(alias = "ingester2")]
@ -100,7 +101,7 @@ pub async fn command(config: Config) -> Result<()> {
.await
.context(GarbageCollectorSnafu),
Some(Command::Querier(config)) => querier::command(config).await.context(QuerierSnafu),
Some(Command::Router2(config)) => router2::command(config).await.context(Router2Snafu),
Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu),
Some(Command::Ingester(config)) => ingester::command(config).await.context(IngesterSnafu),
Some(Command::AllInOne(config)) => all_in_one::command(config).await.context(AllInOneSnafu),
Some(Command::Test(config)) => test::command(config).await.context(TestSnafu),

View File

@ -1,8 +1,8 @@
//! Command line options for running a router2 that uses the RPC write path.
//! Command line options for running a router that uses the RPC write path.
use super::main;
use crate::process_info::setup_metric_registry;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, object_store::make_object_store, router2::Router2Config,
catalog_dsn::CatalogDsnConfig, object_store::make_object_store, router::RouterConfig,
run_config::RunConfig,
};
use iox_time::{SystemProvider, TimeProvider};
@ -10,7 +10,7 @@ use ioxd_common::{
server_type::{CommonServerState, CommonServerStateError},
Service,
};
use ioxd_router::create_router2_server_type;
use ioxd_router::create_router_server_type;
use object_store::DynObjectStore;
use object_store_metrics::ObjectStoreMetrics;
use observability_deps::tracing::*;
@ -64,7 +64,7 @@ pub struct Config {
pub(crate) catalog_dsn: CatalogDsnConfig,
#[clap(flatten)]
pub(crate) router_config: Router2Config,
pub(crate) router_config: RouterConfig,
}
pub async fn command(config: Config) -> Result<()> {
@ -80,7 +80,7 @@ pub async fn command(config: Config) -> Result<()> {
let catalog = config
.catalog_dsn
.get_catalog("router2", Arc::clone(&metrics))
.get_catalog("router", Arc::clone(&metrics))
.await?;
let object_store = make_object_store(config.run_config.object_store_config())
@ -92,7 +92,7 @@ pub async fn command(config: Config) -> Result<()> {
&metrics,
));
let server_type = create_router2_server_type(
let server_type = create_router_server_type(
&common_state,
Arc::clone(&metrics),
catalog,
@ -101,7 +101,7 @@ pub async fn command(config: Config) -> Result<()> {
)
.await?;
info!("starting router2");
info!("starting router");
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(main::main(common_state, services, metrics).await?)
}

View File

@ -6,4 +6,4 @@
// The tests are defined in the submodules of [`end_to_end_cases`]
mod end_to_end_cases;
mod query_tests2;
mod query_tests;

View File

@ -25,7 +25,7 @@ async fn dsn_file() {
println!("databse_url is {database_url}");
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_non_shared2(database_url).await;
let mut cluster = MiniCluster::create_non_shared(database_url).await;
StepTest::new(
&mut cluster,

View File

@ -60,7 +60,7 @@ async fn parquet_to_lp() {
// The test below assumes a specific partition id, so use a
// non-shared one here so concurrent tests don't interfere with
// each other
let mut cluster = MiniCluster::create_non_shared2(database_url).await;
let mut cluster = MiniCluster::create_non_shared(database_url).await;
let line_protocol = "my_awesome_table,tag1=A,tag2=B val=42i 123456";
@ -184,7 +184,7 @@ async fn schema_cli() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -245,7 +245,7 @@ async fn write_and_query() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -320,7 +320,7 @@ async fn query_error_handling() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -360,7 +360,7 @@ async fn influxql_error_handling() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -480,7 +480,7 @@ async fn namespaces_cli() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -516,7 +516,7 @@ async fn namespaces_cli() {
async fn namespace_retention() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -696,7 +696,7 @@ async fn namespace_retention() {
async fn namespace_deletion() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -784,7 +784,7 @@ async fn query_ingester() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -905,7 +905,7 @@ async fn query_ingester() {
async fn namespace_update_service_limit() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,

View File

@ -1,5 +1,5 @@
//! Tests that we still support running using deprecated names so that deployments continue to work
//! while transitioning.
//! while transitioning. There was never a `querier2` command, so there isn't a test for it here.
use assert_cmd::Command;
use predicates::prelude::*;
@ -29,6 +29,26 @@ async fn ingester2_runs_ingester() {
));
}
#[tokio::test]
async fn router2_runs_router() {
let tmpdir = tempdir().unwrap();
let addrs = BindAddresses::default();
Command::cargo_bin("influxdb_iox")
.unwrap()
.args(["run", "router2", "-v"])
.env_clear()
.env("HOME", tmpdir.path())
.env("INFLUXDB_IOX_WAL_DIRECTORY", tmpdir.path())
.env("INFLUXDB_IOX_CATALOG_TYPE", "memory")
.add_addr_env(ServerType::Router, &addrs)
.timeout(Duration::from_secs(5))
.assert()
.failure()
.stderr(predicate::str::contains("error: unrecognized subcommand 'router2'").not())
.stdout(predicate::str::contains("InfluxDB IOx Router server ready"));
}
#[tokio::test]
async fn compactor2_runs_compactor() {
let tmpdir = tempdir().unwrap();

View File

@ -11,8 +11,8 @@ async fn shard_id_greater_than_num_shards_is_invalid() {
let database_url = maybe_skip_integration!();
let ingester_config = TestConfig::new_ingester(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
let querier_config = TestConfig::new_querier2(&ingester_config).with_querier_mem_pool_bytes(1);
let router_config = TestConfig::new_router(&ingester_config);
let querier_config = TestConfig::new_querier(&ingester_config).with_querier_mem_pool_bytes(1);
let compactor_config = TestConfig::new_compactor(&ingester_config).with_compactor_shards(
2, // num shards 2
100, // and shard id > num shards; not valid
@ -96,8 +96,8 @@ async fn sharded_compactor_0_always_compacts_partition_1() {
// The test below assumes a specific partition id, and it needs to customize the compactor
// config, so use a non-shared minicluster here.
let ingester_config = TestConfig::new_ingester(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
let querier_config = TestConfig::new_querier2(&ingester_config).with_querier_mem_pool_bytes(1);
let router_config = TestConfig::new_router(&ingester_config);
let querier_config = TestConfig::new_querier(&ingester_config).with_querier_mem_pool_bytes(1);
let compactor_config = TestConfig::new_compactor(&ingester_config).with_compactor_shards(
2, // num shards 2
0, // shard ID 0, which will always get partition ID 1
@ -179,8 +179,8 @@ async fn sharded_compactor_1_never_compacts_partition_1() {
// The test below assumes a specific partition id, and it needs to customize the compactor
// config, so use a non-shared minicluster here.
let ingester_config = TestConfig::new_ingester(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
let querier_config = TestConfig::new_querier2(&ingester_config).with_querier_mem_pool_bytes(1);
let router_config = TestConfig::new_router(&ingester_config);
let querier_config = TestConfig::new_querier(&ingester_config).with_querier_mem_pool_bytes(1);
let compactor_config = TestConfig::new_compactor(&ingester_config).with_compactor_shards(
2, // num shards 2
1, // shard ID 1, which will never get partition ID 1

View File

@ -8,7 +8,7 @@ use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTes
#[tokio::test]
pub async fn test_panic() {
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,

View File

@ -36,7 +36,7 @@ async fn flightsql_adhoc_query() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -79,7 +79,7 @@ async fn flightsql_adhoc_query_error() {
let database_url = maybe_skip_integration!();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -119,7 +119,7 @@ async fn flightsql_prepared_query() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -166,7 +166,7 @@ async fn flightsql_get_sql_infos() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -242,7 +242,7 @@ async fn flightsql_get_catalogs() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -286,7 +286,7 @@ async fn flightsql_get_catalogs_matches_information_schema() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -346,7 +346,7 @@ async fn flightsql_get_cross_reference() {
let foreign_table_name = "foreign_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -403,7 +403,7 @@ async fn flightsql_get_tables() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -573,7 +573,7 @@ async fn flightsql_get_tables_decoded_table_schema() {
let database_url = maybe_skip_integration!();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -646,7 +646,7 @@ async fn flightsql_get_tables_matches_information_schema() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -716,7 +716,7 @@ async fn flightsql_get_table_types() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -761,7 +761,7 @@ async fn flightsql_get_table_types_matches_information_schema() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -822,7 +822,7 @@ async fn flightsql_get_db_schemas() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -932,7 +932,7 @@ async fn flightsql_get_db_schema_matches_information_schema() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -999,7 +999,7 @@ async fn flightsql_get_exported_keys() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -1045,7 +1045,7 @@ async fn flightsql_get_imported_keys() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -1091,7 +1091,7 @@ async fn flightsql_get_primary_keys() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -1137,7 +1137,7 @@ async fn flightsql_get_xdbc_type_info() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -1212,7 +1212,7 @@ async fn flightsql_jdbc() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -1269,7 +1269,7 @@ async fn flightsql_jdbc_authz_token() {
let mut authz = Authorizer::create().await;
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_non_shared2_with_authz(database_url, authz.addr()).await;
let mut cluster = MiniCluster::create_non_shared_with_authz(database_url, authz.addr()).await;
let write_token = authz.create_token_for(cluster.namespace(), &["ACTION_WRITE"]);
let read_token =
@ -1338,7 +1338,7 @@ async fn flightsql_jdbc_authz_handshake() {
let mut authz = Authorizer::create().await;
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_non_shared2_with_authz(database_url, authz.addr()).await;
let mut cluster = MiniCluster::create_non_shared_with_authz(database_url, authz.addr()).await;
let write_token = authz.create_token_for(cluster.namespace(), &["ACTION_WRITE"]);
let read_token =
@ -1509,7 +1509,7 @@ async fn flightsql_schema_matches() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -1635,7 +1635,7 @@ async fn authz() {
let mut authz = Authorizer::create().await;
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_non_shared2_with_authz(database_url, authz.addr()).await;
let mut cluster = MiniCluster::create_non_shared_with_authz(database_url, authz.addr()).await;
let write_token = authz.create_token_for(cluster.namespace(), &["ACTION_WRITE"]);
let read_token = authz.create_token_for(cluster.namespace(), &["ACTION_READ_SCHEMA"]);
@ -1731,7 +1731,7 @@ async fn flightsql_client_header_same_database() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -1782,7 +1782,7 @@ async fn flightsql_client_header_different_database() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -1822,7 +1822,7 @@ async fn flightsql_client_header_no_database() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,

View File

@ -12,7 +12,7 @@ async fn influxql_returns_error() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -49,7 +49,7 @@ async fn influxql_select_returns_results() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -86,7 +86,7 @@ async fn authz() {
let mut authz = Authorizer::create().await;
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_non_shared2_with_authz(database_url, authz.addr()).await;
let mut cluster = MiniCluster::create_non_shared_with_authz(database_url, authz.addr()).await;
let write_token = authz.create_token_for(cluster.namespace(), &["ACTION_WRITE"]);
let read_token = authz.create_token_for(cluster.namespace(), &["ACTION_READ"]);

View File

@ -13,7 +13,7 @@ async fn persist_on_demand() {
let database_url = maybe_skip_integration!();
let table_name = "mytable";
let mut cluster = MiniCluster::create_shared2_never_persist(database_url).await;
let mut cluster = MiniCluster::create_shared_never_persist(database_url).await;
StepTest::new(
&mut cluster,
@ -98,7 +98,7 @@ async fn ingester_flight_api() {
// Set up cluster
// Don't use a shared cluster because the ingester is going to be restarted
let mut cluster = MiniCluster::create_non_shared2(database_url).await;
let mut cluster = MiniCluster::create_non_shared(database_url).await;
// Write some data into the v2 HTTP API to set up the namespace and schema ==============
let lp = format!("{table_name},tag1=A,tag2=B val=42i 123456");
@ -176,7 +176,7 @@ async fn ingester_flight_api_namespace_not_found() {
let database_url = maybe_skip_integration!();
// Set up cluster
let cluster = MiniCluster::create_shared2(database_url).await;
let cluster = MiniCluster::create_shared(database_url).await;
// query the ingester
let query = IngesterQueryRequest::new(
@ -204,7 +204,7 @@ async fn ingester_flight_api_table_not_found() {
let database_url = maybe_skip_integration!();
// Set up cluster
let cluster = MiniCluster::create_shared2(database_url).await;
let cluster = MiniCluster::create_shared(database_url).await;
// Write some data into the v2 HTTP API ==============
let lp = String::from("my_table,tag1=A,tag2=B val=42i 123456");

View File

@ -13,8 +13,8 @@ async fn querier_namespace_client() {
let table_name = "the_table";
let ingester_config = TestConfig::new_ingester(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
let querier_config = TestConfig::new_querier2(&ingester_config);
let router_config = TestConfig::new_router(&ingester_config);
let querier_config = TestConfig::new_querier(&ingester_config);
// Set up the cluster ====================================
let cluster = MiniCluster::new()
@ -49,7 +49,7 @@ async fn soft_deletion() {
// Set up the cluster ====================================
// cannot use shared cluster because we're going to restart services
let mut cluster = MiniCluster::create_non_shared2(database_url).await;
let mut cluster = MiniCluster::create_non_shared(database_url).await;
let namespace_name = cluster.namespace().to_string();
let table_name = "ananas";

View File

@ -24,7 +24,7 @@ async fn basic_ingester() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2_never_persist(database_url).await;
let mut cluster = MiniCluster::create_shared_never_persist(database_url).await;
StepTest::new(
&mut cluster,
@ -62,7 +62,7 @@ async fn never_persist_really_never_persists() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2_never_persist(database_url).await;
let mut cluster = MiniCluster::create_shared_never_persist(database_url).await;
StepTest::new(
&mut cluster,
@ -90,7 +90,7 @@ async fn basic_on_parquet() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -126,9 +126,9 @@ async fn basic_empty() {
// Set up the cluster ====================================
let ingester_config = TestConfig::new_ingester(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
// specially create a querier2 config that is NOT connected to the ingester
let querier_config = TestConfig::new_querier2_without_ingester(&ingester_config);
let router_config = TestConfig::new_router(&ingester_config);
// specially create a querier config that is NOT connected to the ingester
let querier_config = TestConfig::new_querier_without_ingester(&ingester_config);
let mut cluster = MiniCluster::new()
.with_ingester(ingester_config)
@ -197,9 +197,9 @@ async fn basic_no_ingester_connection() {
// Set up the cluster ====================================
let ingester_config = TestConfig::new_ingester(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
// specially create a querier2 config that is NOT connected to the ingester
let querier_config = TestConfig::new_querier2_without_ingester(&ingester_config);
let router_config = TestConfig::new_router(&ingester_config);
// specially create a querier config that is NOT connected to the ingester
let querier_config = TestConfig::new_querier_without_ingester(&ingester_config);
let mut cluster = MiniCluster::new()
.with_ingester(ingester_config)
@ -245,7 +245,7 @@ async fn query_after_persist_sees_new_files() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
let steps = vec![
Step::RecordNumParquetFiles,
@ -308,7 +308,7 @@ async fn table_not_found_on_ingester() {
// Set up the cluster ====================================
// cannot use shared cluster because we're restarting the ingester
let mut cluster = MiniCluster::create_non_shared2(database_url).await;
let mut cluster = MiniCluster::create_non_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -363,7 +363,7 @@ async fn issue_4631_a() {
let database_url = maybe_skip_integration!();
// Set up a cluster configured to never persist automatically
let mut cluster = MiniCluster::create_shared2_never_persist(database_url).await;
let mut cluster = MiniCluster::create_shared_never_persist(database_url).await;
let steps = vec![
Step::RecordNumParquetFiles,
@ -434,7 +434,7 @@ async fn issue_4631_b() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -496,7 +496,7 @@ async fn unsupported_sql_returns_error() {
let database_url = maybe_skip_integration!();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
fn make_error_message(name: &str) -> String {
format!("Error while planning query: This feature is not implemented: Unsupported logical plan: {name}")
@ -548,7 +548,7 @@ async fn table_or_namespace_not_found() {
let database_url = maybe_skip_integration!();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -637,8 +637,8 @@ async fn oom_protection() {
// Set up the cluster ====================================
let ingester_config = TestConfig::new_ingester(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
let querier_config = TestConfig::new_querier2(&ingester_config).with_querier_mem_pool_bytes(1);
let router_config = TestConfig::new_router(&ingester_config);
let querier_config = TestConfig::new_querier(&ingester_config).with_querier_mem_pool_bytes(1);
let mut cluster = MiniCluster::new()
.with_router(router_config)
.await
@ -714,7 +714,7 @@ async fn authz() {
let mut authz = Authorizer::create().await;
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_non_shared2_with_authz(database_url, authz.addr()).await;
let mut cluster = MiniCluster::create_non_shared_with_authz(database_url, authz.addr()).await;
let write_token = authz.create_token_for(cluster.namespace(), &["ACTION_WRITE"]);
let read_token = authz.create_token_for(cluster.namespace(), &["ACTION_READ"]);

View File

@ -6,7 +6,7 @@ mod read_filter;
mod read_group;
mod read_window_aggregate;
use crate::query_tests2::setups::SETUPS;
use crate::query_tests::setups::SETUPS;
use async_trait::async_trait;
use futures::FutureExt;
use observability_deps::tracing::*;
@ -21,7 +21,7 @@ pub(crate) async fn run_no_data_test(custom: FCustom) {
let database_url = maybe_skip_integration!();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(&mut cluster, vec![Step::Custom(custom)])
.run()
@ -34,7 +34,7 @@ pub(crate) async fn run_data_test(generator: Arc<DataGenerator>, custom: FCustom
let database_url = maybe_skip_integration!();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -90,7 +90,7 @@ trait InfluxRpcTest: Send + Sync + 'static {
info!("Using setup {setup_name}");
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2_never_persist(database_url.clone()).await;
let mut cluster = MiniCluster::create_shared_never_persist(database_url.clone()).await;
let setup_steps = SETUPS
.get(setup_name)

View File

@ -368,7 +368,7 @@ async fn do_read_filter_test(
let expected_frames: Vec<String> = expected_frames.into_iter().map(|s| s.to_string()).collect();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
let line_protocol = input_lines.join("\n");
StepTest::new(

View File

@ -243,7 +243,7 @@ async fn do_test_invalid_group_key(variant: InvalidGroupKey) {
let database_url = maybe_skip_integration!();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -306,7 +306,7 @@ async fn do_read_group_test(
let expected_frames: Vec<String> = expected_frames.into_iter().map(|s| s.to_string()).collect();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
let line_protocol = input_lines.join("\n");
StepTest::new(

View File

@ -80,7 +80,7 @@ async fn do_read_window_aggregate_test(
let expected_frames: Vec<String> = expected_frames.into_iter().map(|s| s.to_string()).collect();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
let line_protocol = input_lines.join("\n");
StepTest::new(

View File

@ -21,9 +21,9 @@ async fn basic_multi_ingesters() {
.map(|ingester_config| ingester_config.ingester_base())
.collect();
let router_config =
TestConfig::new_router2(&ingester_configs[0]).with_ingester_addresses(&ingester_addresses);
TestConfig::new_router(&ingester_configs[0]).with_ingester_addresses(&ingester_addresses);
let querier_config =
TestConfig::new_querier2(&ingester_configs[0]).with_ingester_addresses(&ingester_addresses);
TestConfig::new_querier(&ingester_configs[0]).with_ingester_addresses(&ingester_addresses);
let mut cluster = MiniCluster::new();
for ingester_config in ingester_configs {
@ -103,12 +103,12 @@ async fn write_replication() {
.iter()
.map(|ingester_config| ingester_config.ingester_base())
.collect();
let router_config = TestConfig::new_router2(&ingester_configs[0])
let router_config = TestConfig::new_router(&ingester_configs[0])
.with_ingester_addresses(&ingester_addresses)
// Require both ingesters to get this write to be counted as a full write
.with_rpc_write_replicas(NonZeroUsize::new(2).unwrap());
let querier_config =
TestConfig::new_querier2(&ingester_configs[0]).with_ingester_addresses(&ingester_addresses);
TestConfig::new_querier(&ingester_configs[0]).with_ingester_addresses(&ingester_addresses);
let mut cluster = MiniCluster::new();
for ingester_config in ingester_configs {

View File

@ -16,7 +16,7 @@ async fn remote_store_get_table() {
let table_name = "my_awesome_table";
let other_table_name = "my_ordinary_table";
let mut cluster = MiniCluster::create_shared2(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -243,7 +243,7 @@ async fn remote_partition_and_get_from_store_and_pull() {
// The test below assumes a specific partition id, so use a
// non-shared one here so concurrent tests don't interfere with
// each other
let mut cluster = MiniCluster::create_non_shared2(database_url).await;
let mut cluster = MiniCluster::create_non_shared(database_url).await;
StepTest::new(
&mut cluster,

View File

@ -74,7 +74,7 @@ async fn authz() {
let mut authz = Authorizer::create().await;
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_non_shared2_with_authz(database_url, authz.addr()).await;
let mut cluster = MiniCluster::create_non_shared_with_authz(database_url, authz.addr()).await;
let write_token = authz.create_token_for(cluster.namespace(), &["ACTION_WRITE"]);
let read_token = authz.create_token_for(cluster.namespace(), &["ACTION_READ"]);

View File

@ -1,4 +1,4 @@
use crate::query_tests2::setups::SETUPS;
use crate::query_tests::setups::SETUPS;
use data_types::ColumnType;
use futures::FutureExt;
use observability_deps::tracing::*;
@ -35,7 +35,7 @@ impl SchemaTest {
info!("Using setup {setup_name}");
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2_never_persist(database_url.clone()).await;
let mut cluster = MiniCluster::create_shared_never_persist(database_url.clone()).await;
let setup_steps = SETUPS
.get(setup_name)

View File

@ -33,7 +33,7 @@
//! -- IOX_SETUP: OneMeasurementFourChunksWithDuplicates
//! ```
//!
//! To add a new setup, follow the pattern in `influxdb_iox/tests/query_tests2/setups.rs`.
//! To add a new setup, follow the pattern in `influxdb_iox/tests/query_tests/setups.rs`.
use super::framework::{ChunkStage, TestCase};

View File

@ -69,14 +69,14 @@ impl TestCase {
// Setup that differs by chunk stage.
let mut cluster = match chunk_stage {
ChunkStage::Ingester => {
MiniCluster::create_shared2_never_persist(database_url.clone()).await
MiniCluster::create_shared_never_persist(database_url.clone()).await
}
ChunkStage::Parquet => MiniCluster::create_shared2(database_url.clone()).await,
ChunkStage::Parquet => MiniCluster::create_shared(database_url.clone()).await,
ChunkStage::All => unreachable!("See `impl IntoIterator for ChunkStage`"),
};
let given_input_path: PathBuf = self.input.into();
let mut input_path = PathBuf::from("tests/query_tests2/");
let mut input_path = PathBuf::from("tests/query_tests/");
input_path.push(given_input_path.clone());
let contents = fs::read_to_string(&input_path).unwrap_or_else(|_| {
panic!("Could not read test case file `{}`", input_path.display())

View File

@ -1,6 +1,6 @@
//! Tests of SQL queries that are expected to return particular errors.
use crate::query_tests2::setups::SETUPS;
use crate::query_tests::setups::SETUPS;
use observability_deps::tracing::*;
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest};
@ -94,7 +94,7 @@ impl SqlErrorTest {
info!("Using setup {setup_name}");
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2_never_persist(database_url.clone()).await;
let mut cluster = MiniCluster::create_shared_never_persist(database_url.clone()).await;
let setup_steps = SETUPS
.get(setup_name)

View File

@ -17,10 +17,10 @@ Run ingester:
./target/debug/influxdb_iox run ingester --api-bind=127.0.0.1:8081 --grpc-bind=127.0.0.1:8042 --wal-directory /tmp/iox/wal --catalog-dsn postgres:///iox_shared --object-store=file --data-dir=/tmp/iox/obj -v
```
Run router2:
Run router:
```bash
./target/debug/influxdb_iox run router2 --api-bind=127.0.0.1:8080 --grpc-bind=127.0.0.1:8085 --ingester-addresses=127.0.0.1:8042 --catalog-dsn postgres:///iox_shared -v
./target/debug/influxdb_iox run router --api-bind=127.0.0.1:8080 --grpc-bind=127.0.0.1:8085 --ingester-addresses=127.0.0.1:8042 --catalog-dsn postgres:///iox_shared -v
```
Run querier:

View File

@ -5,7 +5,7 @@ use std::{
use async_trait::async_trait;
use authz::{Authorizer, IoxAuthorizer};
use clap_blocks::router2::Router2Config;
use clap_blocks::router::RouterConfig;
use data_types::{DefaultPartitionTemplate, NamespaceName};
use hashbrown::HashMap;
use hyper::{Body, Request, Response};
@ -91,7 +91,7 @@ impl<D, N> RpcWriteRouterServerType<D, N> {
impl<D, N> std::fmt::Debug for RpcWriteRouterServerType<D, N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RpcWriteRouter")
write!(f, "Router")
}
}
@ -191,12 +191,12 @@ impl HttpApiErrorSource for IoxHttpErrorAdaptor {
}
/// Instantiate a router server that uses the RPC write path
pub async fn create_router2_server_type(
pub async fn create_router_server_type(
common_state: &CommonServerState,
metrics: Arc<metric::Registry>,
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
router_config: &Router2Config,
router_config: &RouterConfig,
) -> Result<Arc<dyn ServerType>> {
let ingester_connections = router_config.ingester_addresses.iter().map(|addr| {
let addr = addr.to_string();

View File

@ -11,6 +11,7 @@ use self::{
balancer::Balancer,
circuit_breaker::CircuitBreaker,
circuit_breaking_client::{CircuitBreakerState, CircuitBreakingClient},
client::RpcWriteClientError,
upstream_snapshot::UpstreamSnapshot,
};
@ -35,9 +36,9 @@ pub const RPC_TIMEOUT: Duration = Duration::from_secs(5);
/// Errors experienced when submitting an RPC write request to an Ingester.
#[derive(Debug, Error)]
pub enum RpcWriteError {
/// The upstream ingester returned an error response.
#[error("upstream ingester error: {0}")]
Upstream(#[from] tonic::Status),
/// The RPC client returned an error.
#[error(transparent)]
Client(#[from] RpcWriteClientError),
/// The RPC call timed out after [`RPC_TIMEOUT`] length of time.
#[error("timeout writing to upstream ingester")]
@ -47,10 +48,6 @@ pub enum RpcWriteError {
#[error("no healthy upstream ingesters available")]
NoUpstreams,
/// The upstream connection is not established.
#[error("upstream {0} is not connected")]
UpstreamNotConnected(String),
/// The write request was not attempted, because not enough upstream
/// ingesters needed to satisfy the configured replication factor are
/// healthy.
@ -237,7 +234,7 @@ where
// This error is an internal implementation detail - the
// meaningful error for the user is "there's no healthy
// upstreams".
RpcWriteError::UpstreamNotConnected(_) => RpcWriteError::NoUpstreams,
RpcWriteError::Client(_) => RpcWriteError::NoUpstreams,
// The number of upstreams no longer satisfies the desired
// replication factor.
RpcWriteError::NoUpstreams => RpcWriteError::NotEnoughReplicas,
@ -304,7 +301,7 @@ where
.await
.map_err(|e| match last_err {
// Any other error is returned as-is.
Some(v) => v,
Some(v) => RpcWriteError::Client(v),
// If the entire write attempt fails during the first RPC write
// request, then the per-request timeout is greater than the write
// attempt timeout, and therefore only one upstream is ever tried.
@ -475,7 +472,7 @@ mod tests {
// Init the write handler with a mock client to capture the rpc calls.
let client1 = Arc::new(MockWriteClient::default().with_ret(iter::once(Err(
RpcWriteError::Upstream(tonic::Status::internal("")),
RpcWriteClientError::Upstream(tonic::Status::internal("")),
))));
let client2 = Arc::new(MockWriteClient::default());
let client3 = Arc::new(MockWriteClient::default());
@ -540,12 +537,12 @@ mod tests {
// The first client in line fails the first request, but will succeed
// the second try.
let client1 = Arc::new(MockWriteClient::default().with_ret([
Err(RpcWriteError::Upstream(tonic::Status::internal(""))),
Err(RpcWriteClientError::Upstream(tonic::Status::internal(""))),
Ok(()),
]));
// This client always errors.
let client2 = Arc::new(MockWriteClient::default().with_ret(iter::repeat_with(|| {
Err(RpcWriteError::Upstream(tonic::Status::internal("")))
Err(RpcWriteClientError::Upstream(tonic::Status::internal("")))
})));
let handler = RpcWrite::new(
@ -618,7 +615,9 @@ mod tests {
#[tokio::test]
async fn test_write_upstream_error() {
let client_1 = Arc::new(MockWriteClient::default().with_ret(iter::repeat_with(|| {
Err(RpcWriteError::Upstream(tonic::Status::internal("bananas")))
Err(RpcWriteClientError::Upstream(tonic::Status::internal(
"bananas",
)))
})));
let circuit_1 = Arc::new(MockCircuitBreaker::default());
circuit_1.set_healthy(true);
@ -629,18 +628,17 @@ mod tests {
)
.await;
assert_matches!(got, Err(RpcWriteError::Upstream(s)) => {
assert_eq!(s.code(), tonic::Code::Internal);
assert_eq!(s.message(), "bananas");
});
assert_matches!(got, Err(RpcWriteError::NoUpstreams));
}
/// Assert that an [`RpcWriteError::UpstreamNotConnected`] error is mapped
/// Assert that an [`RpcWriteClientError::UpstreamNotConnected`] error is mapped
/// to a user-friendly [`RpcWriteError::NoUpstreams`] for consistency.
#[tokio::test]
async fn test_write_map_upstream_not_connected_error() {
let client_1 = Arc::new(MockWriteClient::default().with_ret(iter::repeat_with(|| {
Err(RpcWriteError::UpstreamNotConnected("bananas".to_string()))
Err(RpcWriteClientError::UpstreamNotConnected(
"bananas".to_string(),
))
})));
let circuit_1 = Arc::new(MockCircuitBreaker::default());
circuit_1.set_healthy(true);
@ -661,13 +659,17 @@ mod tests {
async fn test_write_not_enough_upstreams_for_replication() {
// Initialise two upstreams, 1 healthy, 1 not.
let client_1 = Arc::new(MockWriteClient::default().with_ret(iter::repeat_with(|| {
Err(RpcWriteError::UpstreamNotConnected("bananas".to_string()))
Err(RpcWriteClientError::UpstreamNotConnected(
"bananas".to_string(),
))
})));
let circuit_1 = Arc::new(MockCircuitBreaker::default());
circuit_1.set_healthy(true);
let client_2 = Arc::new(MockWriteClient::default().with_ret(iter::repeat_with(|| {
Err(RpcWriteError::UpstreamNotConnected("bananas".to_string()))
Err(RpcWriteClientError::UpstreamNotConnected(
"bananas".to_string(),
))
})));
let circuit_2 = Arc::new(MockCircuitBreaker::default());
circuit_2.set_healthy(false);
@ -726,7 +728,9 @@ mod tests {
circuit_1.set_healthy(true);
let client_2 = Arc::new(MockWriteClient::default().with_ret(iter::repeat_with(|| {
Err(RpcWriteError::Upstream(tonic::Status::internal("bananas")))
Err(RpcWriteClientError::Upstream(tonic::Status::internal(
"bananas",
)))
})));
let circuit_2 = Arc::new(MockCircuitBreaker::default());
circuit_2.set_healthy(true);
@ -768,7 +772,9 @@ mod tests {
circuit_1.set_healthy(true);
let client_2 = Arc::new(MockWriteClient::default().with_ret([
Err(RpcWriteError::Upstream(tonic::Status::internal("bananas"))),
Err(RpcWriteClientError::Upstream(tonic::Status::internal(
"bananas",
))),
Ok(()),
]));
let circuit_2 = Arc::new(MockCircuitBreaker::default());
@ -811,8 +817,12 @@ mod tests {
// This client sometimes errors (2 times)
let client_2 = Arc::new(MockWriteClient::default().with_ret([
Err(RpcWriteError::Upstream(tonic::Status::internal("bananas"))),
Err(RpcWriteError::Upstream(tonic::Status::internal("bananas"))),
Err(RpcWriteClientError::Upstream(tonic::Status::internal(
"bananas",
))),
Err(RpcWriteClientError::Upstream(tonic::Status::internal(
"bananas",
))),
Ok(()),
]));
let circuit_2 = Arc::new(MockCircuitBreaker::default());
@ -820,7 +830,9 @@ mod tests {
// This client always errors
let client_3 = Arc::new(MockWriteClient::default().with_ret(iter::repeat_with(|| {
Err(RpcWriteError::UpstreamNotConnected("bananas".to_string()))
Err(RpcWriteClientError::UpstreamNotConnected(
"bananas".to_string(),
))
})));
let circuit_3 = Arc::new(MockCircuitBreaker::default());
circuit_3.set_healthy(true);

View File

@ -3,7 +3,10 @@ use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use generated_types::influxdata::iox::ingester::v1::WriteRequest;
use super::{circuit_breaker::CircuitBreaker, client::WriteClient, RpcWriteError};
use super::{
circuit_breaker::CircuitBreaker,
client::{RpcWriteClientError, WriteClient},
};
/// An internal abstraction over the health probing & result recording
/// functionality of a circuit breaker.
@ -95,7 +98,7 @@ where
T: WriteClient,
C: CircuitBreakerState,
{
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError> {
let res = self.inner.write(op).await;
self.state.observe(&res);
res
@ -194,9 +197,17 @@ mod tests {
#[tokio::test]
async fn test_observe() {
let circuit_breaker = Arc::new(MockCircuitBreaker::default());
let mock_client = Arc::new(MockWriteClient::default().with_ret(Box::new(
[Ok(()), Err(RpcWriteError::NoUpstreams)].into_iter(),
)));
let mock_client = Arc::new(
MockWriteClient::default().with_ret(Box::new(
[
Ok(()),
Err(RpcWriteClientError::UpstreamNotConnected(
"bananas".to_string(),
)),
]
.into_iter(),
)),
);
let wrapper = CircuitBreakingClient::new(Arc::clone(&mock_client), "bananas")
.with_circuit_breaker(Arc::clone(&circuit_breaker));

View File

@ -4,20 +4,32 @@ use async_trait::async_trait;
use generated_types::influxdata::iox::ingester::v1::{
write_service_client::WriteServiceClient, WriteRequest,
};
use thiserror::Error;
use super::RpcWriteError;
/// Request errors returned by [`WriteClient`] implementations.
#[derive(Debug, Error)]
pub enum RpcWriteClientError {
/// The upstream connection is not established (lazy connection
/// establishment).
#[error("upstream {0} is not connected")]
UpstreamNotConnected(String),
/// The upstream ingester returned an error response.
#[error("upstream ingester error: {0}")]
Upstream(#[from] tonic::Status),
}
/// An abstract RPC client that pushes `op` to an opaque receiver.
#[async_trait]
pub(super) trait WriteClient: Send + Sync + std::fmt::Debug {
/// Write `op` and wait for a response.
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError>;
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError>;
}
/// An implementation of [`WriteClient`] for the tonic gRPC client.
#[async_trait]
impl WriteClient for WriteServiceClient<tonic::transport::Channel> {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError> {
WriteServiceClient::write(&mut self.clone(), op).await?;
Ok(())
}
@ -31,7 +43,7 @@ pub mod mock {
struct State {
calls: Vec<WriteRequest>,
ret: Box<dyn Iterator<Item = Result<(), RpcWriteError>> + Send + Sync>,
ret: Box<dyn Iterator<Item = Result<(), RpcWriteClientError>> + Send + Sync>,
}
/// A mock implementation of the [`WriteClient`] for testing purposes.
@ -71,7 +83,7 @@ pub mod mock {
pub(crate) fn with_ret<T, U>(self, ret: T) -> Self
where
T: IntoIterator<IntoIter = U>,
U: Iterator<Item = Result<(), RpcWriteError>> + Send + Sync + 'static,
U: Iterator<Item = Result<(), RpcWriteClientError>> + Send + Sync + 'static,
{
self.state.lock().ret = Box::new(ret.into_iter());
self
@ -80,7 +92,7 @@ pub mod mock {
#[async_trait]
impl WriteClient for Arc<MockWriteClient> {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError> {
let mut guard = self.state.lock();
guard.calls.push(op);
guard.ret.next().expect("no mock response")

View File

@ -21,7 +21,7 @@ use tonic::{
Code,
};
use super::{client::WriteClient, RpcWriteError};
use super::client::{RpcWriteClientError, WriteClient};
const RETRY_INTERVAL: Duration = Duration::from_secs(1);
const CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
@ -39,7 +39,7 @@ const MAX_INCOMING_MSG_BYTES: usize = 1024 * 1024; // 1 MiB
/// once a connection has been established, the [`Channel`] internally handles
/// reconnections as needed.
///
/// Returns [`RpcWriteError::UpstreamNotConnected`] when no connection is
/// Returns [`RpcWriteClientError::UpstreamNotConnected`] when no connection is
/// available.
#[derive(Debug)]
pub struct LazyConnector {
@ -94,10 +94,11 @@ impl LazyConnector {
#[async_trait]
impl WriteClient for LazyConnector {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError> {
let conn = self.connection.lock().clone();
let conn =
conn.ok_or_else(|| RpcWriteError::UpstreamNotConnected(self.addr.uri().to_string()))?;
let conn = conn.ok_or_else(|| {
RpcWriteClientError::UpstreamNotConnected(self.addr.uri().to_string())
})?;
match WriteServiceClient::new(conn)
.max_encoding_message_size(self.max_outgoing_msg_bytes)
@ -132,19 +133,15 @@ impl WriteClient for LazyConnector {
/// HTTP proxy would. Unfortunately this is a breaking change in behaviour for
/// networking code like [`tonic`]'s transport implementation, which can no
/// longer easily differentiate network errors from actual application errors.
fn is_envoy_unavailable_error(e: &RpcWriteError) -> bool {
fn is_envoy_unavailable_error(e: &RpcWriteClientError) -> bool {
match e {
RpcWriteError::Upstream(e) if e.code() == Code::Unavailable => e
RpcWriteClientError::Upstream(e) if e.code() == Code::Unavailable => e
.metadata()
.get("server")
.map(|v| v == AsciiMetadataValue::from_static("envoy"))
.unwrap_or(false),
RpcWriteError::Upstream(_)
| RpcWriteError::Timeout(_)
| RpcWriteError::NoUpstreams
| RpcWriteError::UpstreamNotConnected(_)
| RpcWriteError::PartialWrite { .. }
| RpcWriteError::NotEnoughReplicas => false,
RpcWriteClientError::Upstream(_) => false,
RpcWriteClientError::UpstreamNotConnected(_) => unreachable!(),
}
}

View File

@ -23,7 +23,8 @@ use self::write::{
};
use crate::{
dml_handlers::{
DmlError, DmlHandler, PartitionError, RetentionError, RpcWriteError, SchemaError,
client::RpcWriteClientError, DmlError, DmlHandler, PartitionError, RetentionError,
RpcWriteError, SchemaError,
},
namespace_resolver::NamespaceResolver,
};
@ -149,13 +150,17 @@ impl From<&DmlError> for StatusCode {
DmlError::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR,
DmlError::Partition(PartitionError::BatchWrite(_)) => StatusCode::INTERNAL_SERVER_ERROR,
DmlError::Retention(RetentionError::OutsideRetention(_)) => StatusCode::FORBIDDEN,
DmlError::RpcWrite(RpcWriteError::Upstream(_)) => StatusCode::INTERNAL_SERVER_ERROR,
DmlError::RpcWrite(RpcWriteError::Client(RpcWriteClientError::Upstream(_))) => {
StatusCode::INTERNAL_SERVER_ERROR
}
DmlError::RpcWrite(RpcWriteError::Client(
RpcWriteClientError::UpstreamNotConnected(_),
)) => StatusCode::SERVICE_UNAVAILABLE,
DmlError::RpcWrite(RpcWriteError::Timeout(_)) => StatusCode::GATEWAY_TIMEOUT,
DmlError::RpcWrite(
RpcWriteError::NoUpstreams
| RpcWriteError::NotEnoughReplicas
| RpcWriteError::PartialWrite { .. }
| RpcWriteError::UpstreamNotConnected(_),
| RpcWriteError::PartialWrite { .. },
) => StatusCode::SERVICE_UNAVAILABLE,
}
}

View File

@ -34,7 +34,7 @@ pub struct TestConfig {
impl TestConfig {
/// Create a new TestConfig. Tests should use one of the specific
/// configuration setup below, such as [new_router2](Self::new_router2).
/// configuration setup below, such as [new_router](Self::new_router).
fn new(
server_type: ServerType,
dsn: Option<String>,
@ -52,12 +52,12 @@ impl TestConfig {
}
}
/// Create a minimal router2 configuration sharing configuration with the ingester config
pub fn new_router2(ingester_config: &TestConfig) -> Self {
/// Create a minimal router configuration sharing configuration with the ingester config
pub fn new_router(ingester_config: &TestConfig) -> Self {
assert_eq!(ingester_config.server_type(), ServerType::Ingester);
Self::new(
ServerType::Router2,
ServerType::Router,
ingester_config.dsn().to_owned(),
ingester_config.catalog_schema_name(),
)
@ -104,12 +104,12 @@ impl TestConfig {
.with_new_wal()
}
/// Create a minimal querier2 configuration from the specified ingester configuration, using
/// Create a minimal querier configuration from the specified ingester configuration, using
/// the same dsn and object store, and pointing at the specified ingester.
pub fn new_querier2(ingester_config: &TestConfig) -> Self {
pub fn new_querier(ingester_config: &TestConfig) -> Self {
assert_eq!(ingester_config.server_type(), ServerType::Ingester);
Self::new_querier2_without_ingester(ingester_config)
Self::new_querier_without_ingester(ingester_config)
.with_ingester_addresses(&[ingester_config.ingester_base()])
}
@ -123,11 +123,11 @@ impl TestConfig {
.with_existing_object_store(other)
}
/// Create a minimal querier2 configuration from the specified ingester configuration, using
/// Create a minimal querier configuration from the specified ingester configuration, using
/// the same dsn and object store, but without specifying the ingester addresses
pub fn new_querier2_without_ingester(ingester_config: &TestConfig) -> Self {
pub fn new_querier_without_ingester(ingester_config: &TestConfig) -> Self {
Self::new(
ServerType::Querier2,
ServerType::Querier,
ingester_config.dsn().to_owned(),
ingester_config.catalog_schema_name(),
)

View File

@ -86,7 +86,7 @@ impl MiniCluster {
/// namespace and set of connections
///
/// Note this is an internal implementation -- please use
/// [`create_shared2`](Self::create_shared2) and [`new`](Self::new) to create new MiniClusters.
/// [`create_shared`](Self::create_shared) and [`new`](Self::new) to create new MiniClusters.
fn new_from_fixtures(
router: Option<ServerFixture>,
ingesters: Vec<ServerFixture>,
@ -116,10 +116,10 @@ impl MiniCluster {
///
/// Note: Because the underlying server processes are shared across multiple tests, all users
/// of this `MiniCluster` instance should only modify their own unique namespace.
pub async fn create_shared2(database_url: String) -> Self {
pub async fn create_shared(database_url: String) -> Self {
let start = Instant::now();
let mut shared_servers = GLOBAL_SHARED_SERVERS2.lock().await;
debug!(mutex_wait=?start.elapsed(), "creating standard2 cluster");
let mut shared_servers = GLOBAL_SHARED_SERVERS.lock().await;
debug!(mutex_wait=?start.elapsed(), "creating standard cluster");
// try to reuse existing server processes
if let Some(shared) = shared_servers.take() {
@ -134,34 +134,34 @@ impl MiniCluster {
let new_self = cluster.create().await;
info!(
total_wait=?start.elapsed(),
"created new mini cluster2 from existing cluster"
"created new mini cluster from existing cluster"
);
return new_self;
} else {
info!("some server proceses of previous cluster2 have already returned");
info!("some server proceses of previous cluster have already returned");
}
}
// Have to make a new one
info!("Create a new server2");
let new_cluster = Self::create_non_shared2(database_url).await;
info!("Create a new server");
let new_cluster = Self::create_non_shared(database_url).await;
// Update the shared servers to point at the newly created server proesses
*shared_servers = Some(SharedServers::new(&new_cluster));
new_cluster
}
/// Create a shared "version 2" MiniCluster that has a router, ingester set to essentially
/// Create a shared MiniCluster that has a router, ingester set to essentially
/// never persist data (except on-demand), and querier. Save config for a compactor, but the
/// compactor service should be run on-demand in tests using `compactor run-once` rather than
/// using `run compactor`.
///
/// Note: Because the underlying server processes are shared across multiple tests, all users
/// of this `MiniCluster` instance should only modify their own unique namespace.
pub async fn create_shared2_never_persist(database_url: String) -> Self {
pub async fn create_shared_never_persist(database_url: String) -> Self {
let start = Instant::now();
let mut shared_servers = GLOBAL_SHARED_SERVERS2_NEVER_PERSIST.lock().await;
debug!(mutex_wait=?start.elapsed(), "creating standard2 cluster");
let mut shared_servers = GLOBAL_SHARED_SERVERS_NEVER_PERSIST.lock().await;
debug!(mutex_wait=?start.elapsed(), "creating standard cluster");
// try to reuse existing server processes
if let Some(shared) = shared_servers.take() {
@ -176,30 +176,30 @@ impl MiniCluster {
let new_self = cluster.create().await;
info!(
total_wait=?start.elapsed(),
"created new mini cluster2 from existing cluster"
"created new mini cluster from existing cluster"
);
return new_self;
} else {
info!("some server proceses of previous cluster2 have already returned");
info!("some server proceses of previous cluster have already returned");
}
}
// Have to make a new one
info!("Create a new server2 set to never persist");
let new_cluster = Self::create_non_shared2_never_persist(database_url).await;
info!("Create a new server set to never persist");
let new_cluster = Self::create_non_shared_never_persist(database_url).await;
// Update the shared servers to point at the newly created server proesses
*shared_servers = Some(SharedServers::new(&new_cluster));
new_cluster
}
/// Create a non-shared "version 2" "standard" MiniCluster that has a router, ingester,
/// Create a non-shared "standard" MiniCluster that has a router, ingester,
/// querier. Save config for a compactor, but the compactor service should be run on-demand in
/// tests using `compactor run-once` rather than using `run compactor`.
pub async fn create_non_shared2(database_url: String) -> Self {
pub async fn create_non_shared(database_url: String) -> Self {
let ingester_config = TestConfig::new_ingester(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
let querier_config = TestConfig::new_querier2(&ingester_config);
let router_config = TestConfig::new_router(&ingester_config);
let querier_config = TestConfig::new_querier(&ingester_config);
let compactor_config = TestConfig::new_compactor(&ingester_config);
// Set up the cluster ====================================
@ -213,14 +213,14 @@ impl MiniCluster {
.with_compactor_config(compactor_config)
}
/// Create a non-shared "version 2" MiniCluster that has a router, ingester set to essentially
/// Create a non-shared MiniCluster that has a router, ingester set to essentially
/// never persist data (except on-demand), and querier. Save config for a compactor, but the
/// compactor service should be run on-demand in tests using `compactor run-once` rather than
/// using `run compactor`.
pub async fn create_non_shared2_never_persist(database_url: String) -> Self {
pub async fn create_non_shared_never_persist(database_url: String) -> Self {
let ingester_config = TestConfig::new_ingester_never_persist(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
let querier_config = TestConfig::new_querier2(&ingester_config);
let router_config = TestConfig::new_router(&ingester_config);
let querier_config = TestConfig::new_querier(&ingester_config);
let compactor_config = TestConfig::new_compactor(&ingester_config);
// Set up the cluster ====================================
@ -233,21 +233,22 @@ impl MiniCluster {
.await
.with_compactor_config(compactor_config)
}
/// Create a non-shared "version 2" MiniCluster that has a router,
/// Create a non-shared MiniCluster that has a router,
/// ingester, and querier. The router and querier will be configured
/// to use the authorization service and will require all requests to
/// be authorized. Save config for a compactor, but the compactor service
/// should be run on-demand in tests using `compactor run-once` rather
/// than using `run compactor`.
pub async fn create_non_shared2_with_authz(
pub async fn create_non_shared_with_authz(
database_url: String,
authz_addr: impl Into<String> + Clone,
) -> Self {
let ingester_config = TestConfig::new_ingester(&database_url);
let router_config =
TestConfig::new_router2(&ingester_config).with_single_tenancy(authz_addr.clone());
TestConfig::new_router(&ingester_config).with_single_tenancy(authz_addr.clone());
let querier_config =
TestConfig::new_querier2(&ingester_config).with_single_tenancy(authz_addr);
TestConfig::new_querier(&ingester_config).with_single_tenancy(authz_addr);
let compactor_config = TestConfig::new_compactor(&ingester_config);
// Set up the cluster ====================================
@ -679,10 +680,8 @@ fn server_from_weak(server: Option<&Weak<TestServer>>) -> Option<Option<Arc<Test
}
}
// For the new server versions. `GLOBAL_SHARED_SERVERS` can be removed and this can be renamed
// when the migration to router2/etc is complete.
static GLOBAL_SHARED_SERVERS2: Lazy<Mutex<Option<SharedServers>>> = Lazy::new(|| Mutex::new(None));
static GLOBAL_SHARED_SERVERS2_NEVER_PERSIST: Lazy<Mutex<Option<SharedServers>>> =
static GLOBAL_SHARED_SERVERS: Lazy<Mutex<Option<SharedServers>>> = Lazy::new(|| Mutex::new(None));
static GLOBAL_SHARED_SERVERS_NEVER_PERSIST: Lazy<Mutex<Option<SharedServers>>> =
Lazy::new(|| Mutex::new(None));
async fn next_message(

View File

@ -184,7 +184,7 @@ impl Connections {
let server_type = test_config.server_type();
self.router_grpc_connection = match server_type {
ServerType::AllInOne | ServerType::Router2 => {
ServerType::AllInOne | ServerType::Router => {
let client_base = test_config.addrs().router_grpc_api().client_base();
Some(
grpc_channel(test_config, client_base.as_ref())
@ -208,7 +208,7 @@ impl Connections {
};
self.querier_grpc_connection = match server_type {
ServerType::AllInOne | ServerType::Querier2 => {
ServerType::AllInOne | ServerType::Querier => {
let client_base = test_config.addrs().querier_grpc_api().client_base();
Some(
grpc_channel(test_config, client_base.as_ref())
@ -482,7 +482,7 @@ impl TestServer {
`influxdb_iox compactor run-once` instead"
);
}
ServerType::Router2 => {
ServerType::Router => {
if check_catalog_service_health(
server_type,
connections.router_grpc_connection(),
@ -502,7 +502,7 @@ impl TestServer {
return;
}
}
ServerType::Querier2 => {
ServerType::Querier => {
if check_arrow_service_health(
server_type,
connections.querier_grpc_connection(),

View File

@ -4,8 +4,8 @@ use super::addrs::BindAddresses;
pub enum ServerType {
AllInOne,
Ingester,
Router2,
Querier2,
Router,
Querier,
Compactor,
}
@ -15,8 +15,8 @@ impl ServerType {
match self {
Self::AllInOne => "all-in-one",
Self::Ingester => "ingester",
Self::Router2 => "router2",
Self::Querier2 => "querier",
Self::Router => "router",
Self::Querier => "querier",
Self::Compactor => "compactor",
}
}
@ -73,7 +73,7 @@ fn addr_envs(server_type: ServerType, addrs: &BindAddresses) -> Vec<(&'static st
addrs.ingester_grpc_api().bind_addr().to_string(),
),
],
ServerType::Router2 => vec![
ServerType::Router => vec![
(
"INFLUXDB_IOX_BIND_ADDR",
addrs.router_http_api().bind_addr().to_string(),
@ -87,7 +87,7 @@ fn addr_envs(server_type: ServerType, addrs: &BindAddresses) -> Vec<(&'static st
addrs.ingester_grpc_api().bind_addr().to_string(),
),
],
ServerType::Querier2 => vec![
ServerType::Querier => vec![
(
"INFLUXDB_IOX_BIND_ADDR",
addrs.router_http_api().bind_addr().to_string(),