diff --git a/.circleci/config.yml b/.circleci/config.yml index fad3f798cd..0268612689 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -259,7 +259,7 @@ jobs: - cache_restore - run: name: Cargo test RPC write path - command: cargo test --workspace --features rpc_write + command: cargo test --workspace - cache_save test: diff --git a/Cargo.lock b/Cargo.lock index b168c01a1c..11e09f0e01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1569,18 +1569,6 @@ dependencies = [ "str-buf", ] -[[package]] -name = "escargot" -version = "0.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5584ba17d7ab26a8a7284f13e5bd196294dd2f2d79773cff29b9e9edef601a6" -dependencies = [ - "log", - "once_cell", - "serde", - "serde_json", -] - [[package]] name = "event-listener" version = "2.5.3" @@ -5429,7 +5417,6 @@ dependencies = [ "assert_cmd", "bytes", "data_types", - "escargot", "futures", "generated_types", "http", diff --git a/clap_blocks/Cargo.toml b/clap_blocks/Cargo.toml index 7fa3508599..0f97e01cdb 100644 --- a/clap_blocks/Cargo.toml +++ b/clap_blocks/Cargo.toml @@ -33,7 +33,3 @@ test_helpers = { path = "../test_helpers" } azure = ["object_store/azure"] # Optional Azure Object store support gcp = ["object_store/gcp"] # Optional GCP object store support aws = ["object_store/aws"] # Optional AWS / S3 object store support - -# Temporary feature to use the RPC write path instead of the write buffer during the transition -# away from using Kafka. -rpc_write = [] diff --git a/clap_blocks/src/querier.rs b/clap_blocks/src/querier.rs index 955b09cf27..7b353f6705 100644 --- a/clap_blocks/src/querier.rs +++ b/clap_blocks/src/querier.rs @@ -212,7 +212,6 @@ pub struct QuerierConfig { /// /// for multiple addresses. #[clap(long = "ingester-addresses", env = "INFLUXDB_IOX_INGESTER_ADDRESSES")] - #[cfg(feature = "rpc_write")] pub ingester_addresses: Vec, /// Size of the RAM cache used to store catalog metadata information in bytes. @@ -275,42 +274,14 @@ impl QuerierConfig { /// Return the querier config's ingester addresses. If `--shard-to-ingesters-file` is used to /// specify a JSON file containing shard to ingester address mappings, this returns `Err` if /// there are any problems reading, deserializing, or interpreting the file. - #[cfg(not(feature = "rpc_write"))] - pub fn ingester_addresses(&self) -> Result { - if let Some(file) = &self.shard_to_ingesters_file { - let contents = - fs::read_to_string(file).context(ShardToIngesterFileReadingSnafu { file })?; - let map = deserialize_shard_ingester_map(&contents)?; - if map.is_empty() { - Ok(IngesterAddresses::None) - } else { - Ok(IngesterAddresses::ByShardIndex(map)) - } - } else if let Some(contents) = &self.shard_to_ingesters { - let map = deserialize_shard_ingester_map(contents)?; - if map.is_empty() { - Ok(IngesterAddresses::None) - } else { - Ok(IngesterAddresses::ByShardIndex(map)) - } - } else { - Ok(IngesterAddresses::None) - } - } - /// Return the querier config's ingester addresses. If `--shard-to-ingesters-file` is used to - /// specify a JSON file containing shard to ingester address mappings, this returns `Err` if - /// there are any problems reading, deserializing, or interpreting the file. - - // When we have switched to using the RPC write path and remove the rpc_write feature, this - // method can be changed to be infallible as clap will handle failure to parse the list of - // strings. + // When we have switched to using the RPC write path only, this method can be changed to be + // infallible as clap will handle failure to parse the list of strings. // - // For now, to enable turning on the `rpc_write` feature in tests but not necessarily switching - // into the RPC write path mode, require *both* the feature flag to be enabled *and* - // `--ingester-addresses` to be set in order to switch. If the `rpc_write` feature is enabled - // and `--shard-to-ingesters*` are set, use the write buffer path instead. - #[cfg(feature = "rpc_write")] + // Switching into the RPC write path mode requires *both* the `INFLUXDB_IOX_MODE` environment + // variable to be specified *and* `--ingester-addresses` to be set in order to switch. If the + // `INFLUXDB_IOX_MODE` is enabled and `--shard-to-ingesters*` is set, use the write buffer path + // instead. pub fn ingester_addresses(&self) -> Result { if let Some(file) = &self.shard_to_ingesters_file { let contents = @@ -354,18 +325,6 @@ impl QuerierConfig { pub fn max_concurrent_queries(&self) -> usize { self.max_concurrent_queries } - - /// Whether the querier is contacting ingesters that use the RPC write path or not. - #[cfg(feature = "rpc_write")] - pub fn rpc_write(&self) -> bool { - true - } - - /// Whether the querier is contacting ingesters that use the RPC write path or not. - #[cfg(not(feature = "rpc_write"))] - pub fn rpc_write(&self) -> bool { - false - } } fn deserialize_shard_ingester_map( diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 7032f06658..cb6e8ded2f 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -106,7 +106,3 @@ jemalloc_replacing_malloc = ["tikv-jemalloc-sys", "tikv-jemalloc-ctl"] # Implicit feature selected when running under `clippy --all-features` to accept mutable exclusive features during # linting clippy = [] - -# Temporary feature to use the RPC write path instead of the write buffer during the transition -# away from using Kafka. -rpc_write = ["ioxd_router/rpc_write", "clap_blocks/rpc_write", "test_helpers_end_to_end/rpc_write"] diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 91b70954ee..20a3eb89ea 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -465,8 +465,7 @@ impl Config { num_query_threads: None, // will be ignored shard_to_ingesters_file: None, // will be ignored shard_to_ingesters: None, // will be ignored - #[cfg(feature = "rpc_write")] - ingester_addresses: vec![], // will be ignored + ingester_addresses: vec![], // will be ignored ram_pool_metadata_bytes: querier_ram_pool_metadata_bytes, ram_pool_data_bytes: querier_ram_pool_data_bytes, max_concurrent_queries: querier_max_concurrent_queries, @@ -620,6 +619,7 @@ pub async fn command(config: Config) -> Result<()> { time_provider, ingester_addresses, querier_config, + rpc_write: false, }) .await?; diff --git a/influxdb_iox/src/commands/run/mod.rs b/influxdb_iox/src/commands/run/mod.rs index 0da818e318..64d13d7621 100644 --- a/influxdb_iox/src/commands/run/mod.rs +++ b/influxdb_iox/src/commands/run/mod.rs @@ -5,12 +5,10 @@ pub(crate) mod all_in_one; mod compactor; mod garbage_collector; mod ingester; -#[cfg(feature = "rpc_write")] mod ingester2; mod main; mod querier; mod router; -#[cfg(feature = "rpc_write")] mod router_rpc_write; mod test; @@ -29,14 +27,12 @@ pub enum Error { #[snafu(display("Error in router subcommand: {}", source))] RouterError { source: router::Error }, - #[cfg(feature = "rpc_write")] #[snafu(display("Error in router-rpc-write subcommand: {}", source))] RouterRpcWriteError { source: router_rpc_write::Error }, #[snafu(display("Error in ingester subcommand: {}", source))] IngesterError { source: ingester::Error }, - #[cfg(feature = "rpc_write")] #[snafu(display("Error in ingester2 subcommand: {}", source))] Ingester2Error { source: ingester2::Error }, @@ -67,10 +63,8 @@ impl Config { Some(Command::GarbageCollector(config)) => config.run_config.logging_config(), Some(Command::Querier(config)) => config.run_config.logging_config(), Some(Command::Router(config)) => config.run_config.logging_config(), - #[cfg(feature = "rpc_write")] Some(Command::RouterRpcWrite(config)) => config.run_config.logging_config(), Some(Command::Ingester(config)) => config.run_config.logging_config(), - #[cfg(feature = "rpc_write")] Some(Command::Ingester2(config)) => config.run_config.logging_config(), Some(Command::AllInOne(config)) => &config.logging_config, Some(Command::Test(config)) => config.run_config.logging_config(), @@ -90,14 +84,12 @@ enum Command { Router(router::Config), /// Run the server in router mode using the RPC write path. - #[cfg(feature = "rpc_write")] RouterRpcWrite(router_rpc_write::Config), /// Run the server in ingester mode Ingester(ingester::Config), /// Run the server in ingester2 mode - #[cfg(feature = "rpc_write")] Ingester2(ingester2::Config), /// Run the server in "all in one" mode (Default) @@ -123,12 +115,10 @@ pub async fn command(config: Config) -> Result<()> { .context(GarbageCollectorSnafu), Some(Command::Querier(config)) => querier::command(config).await.context(QuerierSnafu), Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu), - #[cfg(feature = "rpc_write")] Some(Command::RouterRpcWrite(config)) => router_rpc_write::command(config) .await .context(RouterRpcWriteSnafu), Some(Command::Ingester(config)) => ingester::command(config).await.context(IngesterSnafu), - #[cfg(feature = "rpc_write")] Some(Command::Ingester2(config)) => { ingester2::command(config).await.context(Ingester2Snafu) } diff --git a/influxdb_iox/src/commands/run/querier.rs b/influxdb_iox/src/commands/run/querier.rs index ea6ce6d9ce..119de1f796 100644 --- a/influxdb_iox/src/commands/run/querier.rs +++ b/influxdb_iox/src/commands/run/querier.rs @@ -4,9 +4,7 @@ use crate::process_info::setup_metric_registry; use super::main; use clap_blocks::{ - catalog_dsn::CatalogDsnConfig, - object_store::make_object_store, - querier::{IngesterAddresses, QuerierConfig}, + catalog_dsn::CatalogDsnConfig, object_store::make_object_store, querier::QuerierConfig, run_config::RunConfig, }; use iox_query::exec::Executor; @@ -98,14 +96,14 @@ pub async fn command(config: Config) -> Result<(), Error> { let num_threads = num_query_threads.unwrap_or_else(num_cpus::get); info!(%num_threads, "using specified number of threads per thread pool"); - let ingester_addresses = config.querier_config.ingester_addresses()?; - if config.querier_config.rpc_write() && matches!(ingester_addresses, IngesterAddresses::List(_)) - { + let rpc_write = std::env::var("INFLUXDB_IOX_MODE").is_ok(); + if rpc_write { info!("using the RPC write path"); } else { info!("using the write buffer path"); } + let ingester_addresses = config.querier_config.ingester_addresses()?; info!(?ingester_addresses, "using ingester addresses"); let exec = Arc::new(Executor::new( @@ -122,6 +120,7 @@ pub async fn command(config: Config) -> Result<(), Error> { time_provider, ingester_addresses, querier_config: config.querier_config, + rpc_write, }) .await?; diff --git a/influxdb_iox/src/commands/run/router_rpc_write.rs b/influxdb_iox/src/commands/run/router_rpc_write.rs index 024332c6e5..ee1d80323e 100644 --- a/influxdb_iox/src/commands/run/router_rpc_write.rs +++ b/influxdb_iox/src/commands/run/router_rpc_write.rs @@ -10,7 +10,7 @@ use ioxd_common::{ server_type::{CommonServerState, CommonServerStateError}, Service, }; -use ioxd_router::create_router_grpc_write_server_type; +use ioxd_router::create_router2_server_type; use object_store::DynObjectStore; use object_store_metrics::ObjectStoreMetrics; use observability_deps::tracing::*; @@ -70,7 +70,7 @@ pub async fn command(config: Config) -> Result<()> { let catalog = config .catalog_dsn - .get_catalog("router_rpc_write", Arc::clone(&metrics)) + .get_catalog("router2", Arc::clone(&metrics)) .await?; let object_store = make_object_store(config.run_config.object_store_config()) @@ -82,7 +82,7 @@ pub async fn command(config: Config) -> Result<()> { &metrics, )); - let server_type = create_router_grpc_write_server_type( + let server_type = create_router2_server_type( &common_state, Arc::clone(&metrics), catalog, @@ -91,7 +91,7 @@ pub async fn command(config: Config) -> Result<()> { ) .await?; - info!("starting router_rpc_write"); + info!("starting router2"); let services = vec![Service::create(server_type, common_state.run_config())]; Ok(main::main(common_state, services, metrics).await?) } diff --git a/influxdb_iox/tests/end_to_end_cases/ingester.rs b/influxdb_iox/tests/end_to_end_cases/ingester.rs index bf011eff1a..fd307927a3 100644 --- a/influxdb_iox/tests/end_to_end_cases/ingester.rs +++ b/influxdb_iox/tests/end_to_end_cases/ingester.rs @@ -89,7 +89,6 @@ async fn ingester_flight_api() { }); } -#[cfg(feature = "rpc_write")] #[tokio::test] async fn ingester2_flight_api() { test_helpers::maybe_start_logging(); @@ -98,7 +97,7 @@ async fn ingester2_flight_api() { let table_name = "mytable"; // Set up cluster - let mut cluster = MiniCluster::create_non_shared_rpc_write(database_url).await; + let mut cluster = MiniCluster::create_non_shared2(database_url).await; // Write some data into the v2 HTTP API ============== let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name); diff --git a/ioxd_querier/src/lib.rs b/ioxd_querier/src/lib.rs index ccea378fea..09cac37057 100644 --- a/ioxd_querier/src/lib.rs +++ b/ioxd_querier/src/lib.rs @@ -150,6 +150,7 @@ pub struct QuerierServerTypeArgs<'a> { pub time_provider: Arc, pub ingester_addresses: IngesterAddresses, pub querier_config: QuerierConfig, + pub rpc_write: bool, } #[derive(Debug, Error)] @@ -186,23 +187,36 @@ pub async fn create_querier_server_type( ); assert!(existing.is_none()); - let rpc_write = args.querier_config.rpc_write() - && matches!(args.ingester_addresses, IngesterAddresses::List(_)); - let ingester_connection = match args.ingester_addresses { IngesterAddresses::None => None, - IngesterAddresses::ByShardIndex(map) => Some(create_ingester_connections( - Some(map), - None, - Arc::clone(&catalog_cache), - args.querier_config.ingester_circuit_breaker_threshold, - )), - IngesterAddresses::List(list) => Some(create_ingester_connections( - None, - Some(list), - Arc::clone(&catalog_cache), - args.querier_config.ingester_circuit_breaker_threshold, - )), + IngesterAddresses::ByShardIndex(map) => { + if args.rpc_write { + panic!( + "`INFLUXDB_IOX_MODE` is set but shard to ingester mappings were provided; \ + either unset `INFLUXDB_IOX_MODE` or specify `--ingester-addresses` instead" + ); + } + Some(create_ingester_connections( + Some(map), + None, + Arc::clone(&catalog_cache), + args.querier_config.ingester_circuit_breaker_threshold, + )) + } + IngesterAddresses::List(list) => { + if !args.rpc_write { + panic!( + "`INFLUXDB_IOX_MODE` is unset but ingester addresses were provided; \ + either set `INFLUXDB_IOX_MODE` or specify shard to ingester mappings instead" + ); + } + Some(create_ingester_connections( + None, + Some(list), + Arc::clone(&catalog_cache), + args.querier_config.ingester_circuit_breaker_threshold, + )) + } }; let database = Arc::new( @@ -212,7 +226,7 @@ pub async fn create_querier_server_type( args.exec, ingester_connection, args.querier_config.max_concurrent_queries(), - rpc_write, + args.rpc_write, ) .await?, ); diff --git a/ioxd_router/Cargo.toml b/ioxd_router/Cargo.toml index caafb5a15f..0fee649d10 100644 --- a/ioxd_router/Cargo.toml +++ b/ioxd_router/Cargo.toml @@ -29,9 +29,3 @@ thiserror = "1.0.37" tokio = { version = "1.22", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } tokio-util = { version = "0.7.4" } workspace-hack = { path = "../workspace-hack"} - -[features] - -# Temporary feature to use the RPC write path instead of the write buffer during the transition -# away from using Kafka. -rpc_write = [] diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index d22035e249..d3347b7f03 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -248,7 +248,7 @@ impl HttpApiErrorSource for IoxHttpErrorAdaptor { // NOTE!!! This needs to be kept in sync with `create_router_server_type` until the // switch to the RPC write path/ingester2 is complete! See the numbered sections that annotate // where these two functions line up and where they diverge. -pub async fn create_router_grpc_write_server_type( +pub async fn create_router2_server_type( common_state: &CommonServerState, metrics: Arc, catalog: Arc, @@ -423,7 +423,7 @@ pub async fn create_router_grpc_write_server_type( } /// Instantiate a router server -// NOTE!!! This needs to be kept in sync with `create_router_grpc_write_server_type` until the +// NOTE!!! This needs to be kept in sync with `create_router2_server_type` until the // switch to the RPC write path/ingester2 is complete! See the numbered sections that annotate // where these two functions line up and where they diverge. pub async fn create_router_server_type( @@ -435,7 +435,7 @@ pub async fn create_router_server_type( router_config: &RouterConfig, ) -> Result> { // 1. START: Different Setup Per Router Path: this part is only relevant to using a write - // buffer and should not be added to `create_router_grpc_write_server_type`. + // buffer and should not be added to `create_router2_server_type`. // Initialise the sharded write buffer and instrument it with DML handler metrics. let (write_buffer, sharder) = init_write_buffer( diff --git a/test_helpers_end_to_end/Cargo.toml b/test_helpers_end_to_end/Cargo.toml index a1cd303e06..07c5d9d6f0 100644 --- a/test_helpers_end_to_end/Cargo.toml +++ b/test_helpers_end_to_end/Cargo.toml @@ -11,7 +11,6 @@ arrow_util = { path = "../arrow_util" } assert_cmd = "2.0.7" bytes = "1.3" data_types = { path = "../data_types" } -escargot = "0.5" futures = "0.3" generated_types = { path = "../generated_types" } http = "0.2.8" @@ -31,8 +30,3 @@ tokio = { version = "1.22", features = ["macros", "net", "parking_lot", "rt-mult tokio-util = "0.7" tonic = "0.8" workspace-hack = { path = "../workspace-hack"} - -[features] -# Temporary feature to use the RPC write path instead of the write buffer during the transition -# away from using Kafka. -rpc_write = [] diff --git a/test_helpers_end_to_end/src/config.rs b/test_helpers_end_to_end/src/config.rs index 6058a22469..a4b2ae44e7 100644 --- a/test_helpers_end_to_end/src/config.rs +++ b/test_helpers_end_to_end/src/config.rs @@ -65,16 +65,17 @@ impl TestConfig { .with_new_object_store() } - /// Create a minimal router configuration sharing configuration with the ingester config - pub fn new_router_rpc_write(ingester_config: &TestConfig) -> Self { - assert_eq!(ingester_config.server_type(), ServerType::IngesterRpcWrite); + /// Create a minimal router2 configuration sharing configuration with the ingester2 config + pub fn new_router2(ingester_config: &TestConfig) -> Self { + assert_eq!(ingester_config.server_type(), ServerType::Ingester2); Self::new( - ServerType::RouterRpcWrite, + ServerType::Router2, ingester_config.dsn().to_owned(), ingester_config.catalog_schema_name(), ) .with_existing_object_store(ingester_config) + .with_env("INFLUXDB_IOX_MODE", "2") .with_env( "INFLUXDB_IOX_INGESTER_ADDRESSES", ingester_config @@ -98,17 +99,13 @@ impl TestConfig { .with_default_ingester_options() } - /// Create a minimal ingester configuration, using the dsn configuration from other - pub fn new_ingester_rpc_write(dsn: impl Into) -> Self { + /// Create a minimal ingester2 configuration, using the dsn configuration specified + pub fn new_ingester2(dsn: impl Into) -> Self { let dsn = Some(dsn.into()); - Self::new( - ServerType::IngesterRpcWrite, - dsn, - random_catalog_schema_name(), - ) - .with_new_object_store() - .with_new_wal() - .with_default_ingester_options() + Self::new(ServerType::Ingester2, dsn, random_catalog_schema_name()) + .with_new_object_store() + .with_new_wal() + .with_default_ingester_options() } /// Create a minimal querier configuration from the specified diff --git a/test_helpers_end_to_end/src/mini_cluster.rs b/test_helpers_end_to_end/src/mini_cluster.rs index fc55454912..fe4047ea06 100644 --- a/test_helpers_end_to_end/src/mini_cluster.rs +++ b/test_helpers_end_to_end/src/mini_cluster.rs @@ -152,9 +152,9 @@ impl MiniCluster { .with_compactor_config(compactor_config) } - pub async fn create_non_shared_rpc_write(database_url: String) -> Self { - let ingester_config = TestConfig::new_ingester_rpc_write(&database_url); - let router_config = TestConfig::new_router_rpc_write(&ingester_config); + pub async fn create_non_shared2(database_url: String) -> Self { + let ingester_config = TestConfig::new_ingester2(&database_url); + let router_config = TestConfig::new_router2(&ingester_config); // Set up the cluster ==================================== Self::new() diff --git a/test_helpers_end_to_end/src/server_fixture.rs b/test_helpers_end_to_end/src/server_fixture.rs index 3a5a098ac0..5ca1e80e46 100644 --- a/test_helpers_end_to_end/src/server_fixture.rs +++ b/test_helpers_end_to_end/src/server_fixture.rs @@ -1,3 +1,4 @@ +use assert_cmd::cargo::CommandCargoExt; use futures::prelude::*; use influxdb_iox_client::connection::Connection; use observability_deps::tracing::{info, warn}; @@ -6,7 +7,7 @@ use std::{ fs::OpenOptions, ops::DerefMut, path::Path, - process::Child, + process::{Child, Command}, str, sync::{Arc, Weak}, time::Duration, @@ -184,7 +185,7 @@ impl Connections { let server_type = test_config.server_type(); self.router_grpc_connection = match server_type { - ServerType::AllInOne | ServerType::Router | ServerType::RouterRpcWrite => { + ServerType::AllInOne | ServerType::Router | ServerType::Router2 => { let client_base = test_config.addrs().router_grpc_api().client_base(); Some( grpc_channel(test_config, client_base.as_ref()) @@ -198,7 +199,7 @@ impl Connections { }; self.ingester_grpc_connection = match server_type { - ServerType::AllInOne | ServerType::Ingester | ServerType::IngesterRpcWrite => { + ServerType::AllInOne | ServerType::Ingester | ServerType::Ingester2 => { let client_base = test_config.addrs().ingester_grpc_api().client_base(); Some( grpc_channel(test_config, client_base.as_ref()) @@ -336,7 +337,7 @@ impl TestServer { let run_command_name = server_type.run_command(); - let mut command = cargo_run_command(); + let mut command = Command::cargo_bin("influxdb_iox").unwrap(); let mut command = command .arg("run") .arg(run_command_name) @@ -488,7 +489,7 @@ impl TestServer { `influxdb_iox compactor run-once` instead" ); } - ServerType::Router | ServerType::RouterRpcWrite => { + ServerType::Router | ServerType::Router2 => { if check_catalog_service_health( server_type, connections.router_grpc_connection(), @@ -498,7 +499,7 @@ impl TestServer { return; } } - ServerType::Ingester | ServerType::IngesterRpcWrite => { + ServerType::Ingester | ServerType::Ingester2 => { if check_arrow_service_health( server_type, connections.ingester_grpc_connection(), @@ -546,30 +547,6 @@ impl TestServer { } } -// Build the command, with the `rpc_write` feature enabled to allow testing of the RPC -// write path. -// This will inherit environment from the test runner, in particular, `LOG_FILTER` -#[cfg(feature = "rpc_write")] -fn cargo_run_command() -> std::process::Command { - escargot::CargoBuild::new() - .bin("influxdb_iox") - .features("rpc_write") - .run() - .unwrap() - .command() -} - -// Build the command, WITHOUT the `rpc_write` feature enabled, to not clobber the build. -// This will inherit environment from the test runner, in particular, `LOG_FILTER` -#[cfg(not(feature = "rpc_write"))] -fn cargo_run_command() -> std::process::Command { - escargot::CargoBuild::new() - .bin("influxdb_iox") - .run() - .unwrap() - .command() -} - /// checks catalog service health, as a proxy for all gRPC /// services. Returns false if the service should be checked again async fn check_catalog_service_health(server_type: ServerType, connection: Connection) -> bool { diff --git a/test_helpers_end_to_end/src/server_type.rs b/test_helpers_end_to_end/src/server_type.rs index c26d03d203..161b1a7357 100644 --- a/test_helpers_end_to_end/src/server_type.rs +++ b/test_helpers_end_to_end/src/server_type.rs @@ -4,9 +4,9 @@ use super::addrs::BindAddresses; pub enum ServerType { AllInOne, Ingester, - IngesterRpcWrite, + Ingester2, Router, - RouterRpcWrite, + Router2, Querier, Compactor, } @@ -17,9 +17,9 @@ impl ServerType { match self { Self::AllInOne => "all-in-one", Self::Ingester => "ingester", - Self::IngesterRpcWrite => "ingester2", + Self::Ingester2 => "ingester2", Self::Router => "router", - Self::RouterRpcWrite => "router-rpc-write", + Self::Router2 => "router2", Self::Querier => "querier", Self::Compactor => "compactor", } @@ -77,7 +77,7 @@ fn addr_envs(server_type: ServerType, addrs: &BindAddresses) -> Vec<(&'static st addrs.ingester_grpc_api().bind_addr().to_string(), ), ], - ServerType::IngesterRpcWrite => vec![ + ServerType::Ingester2 => vec![ ( "INFLUXDB_IOX_BIND_ADDR", addrs.router_http_api().bind_addr().to_string(), @@ -86,6 +86,7 @@ fn addr_envs(server_type: ServerType, addrs: &BindAddresses) -> Vec<(&'static st "INFLUXDB_IOX_GRPC_BIND_ADDR", addrs.ingester_grpc_api().bind_addr().to_string(), ), + ("INFLUXDB_IOX_MODE", "2".to_string()), ], ServerType::Router => vec![ ( @@ -97,7 +98,7 @@ fn addr_envs(server_type: ServerType, addrs: &BindAddresses) -> Vec<(&'static st addrs.router_grpc_api().bind_addr().to_string(), ), ], - ServerType::RouterRpcWrite => vec![ + ServerType::Router2 => vec![ ( "INFLUXDB_IOX_BIND_ADDR", addrs.router_http_api().bind_addr().to_string(), @@ -110,6 +111,7 @@ fn addr_envs(server_type: ServerType, addrs: &BindAddresses) -> Vec<(&'static st "INFLUXDB_IOX_INGESTER_ADDRESSES", addrs.ingester_grpc_api().bind_addr().to_string(), ), + ("INFLUXDB_IOX_MODE", "2".to_string()), ], ServerType::Querier => vec![ (