fix: Remove the rpc_write feature flag and use INFLUXDB_IOX_MODE env var instead

And standardize on ingester2 and router2 for consistency.

Connects to #6402.
pull/24376/head
Carol (Nichols || Goulding) 2022-12-14 11:06:48 -05:00
parent a47a566cac
commit aec98015d7
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
18 changed files with 81 additions and 177 deletions

View File

@ -259,7 +259,7 @@ jobs:
- cache_restore
- run:
name: Cargo test RPC write path
command: cargo test --workspace --features rpc_write
command: cargo test --workspace
- cache_save
test:

13
Cargo.lock generated
View File

@ -1569,18 +1569,6 @@ dependencies = [
"str-buf",
]
[[package]]
name = "escargot"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5584ba17d7ab26a8a7284f13e5bd196294dd2f2d79773cff29b9e9edef601a6"
dependencies = [
"log",
"once_cell",
"serde",
"serde_json",
]
[[package]]
name = "event-listener"
version = "2.5.3"
@ -5429,7 +5417,6 @@ dependencies = [
"assert_cmd",
"bytes",
"data_types",
"escargot",
"futures",
"generated_types",
"http",

View File

@ -33,7 +33,3 @@ test_helpers = { path = "../test_helpers" }
azure = ["object_store/azure"] # Optional Azure Object store support
gcp = ["object_store/gcp"] # Optional GCP object store support
aws = ["object_store/aws"] # Optional AWS / S3 object store support
# Temporary feature to use the RPC write path instead of the write buffer during the transition
# away from using Kafka.
rpc_write = []

View File

@ -212,7 +212,6 @@ pub struct QuerierConfig {
///
/// for multiple addresses.
#[clap(long = "ingester-addresses", env = "INFLUXDB_IOX_INGESTER_ADDRESSES")]
#[cfg(feature = "rpc_write")]
pub ingester_addresses: Vec<String>,
/// Size of the RAM cache used to store catalog metadata information in bytes.
@ -275,42 +274,14 @@ impl QuerierConfig {
/// Return the querier config's ingester addresses. If `--shard-to-ingesters-file` is used to
/// specify a JSON file containing shard to ingester address mappings, this returns `Err` if
/// there are any problems reading, deserializing, or interpreting the file.
#[cfg(not(feature = "rpc_write"))]
pub fn ingester_addresses(&self) -> Result<IngesterAddresses, Error> {
if let Some(file) = &self.shard_to_ingesters_file {
let contents =
fs::read_to_string(file).context(ShardToIngesterFileReadingSnafu { file })?;
let map = deserialize_shard_ingester_map(&contents)?;
if map.is_empty() {
Ok(IngesterAddresses::None)
} else {
Ok(IngesterAddresses::ByShardIndex(map))
}
} else if let Some(contents) = &self.shard_to_ingesters {
let map = deserialize_shard_ingester_map(contents)?;
if map.is_empty() {
Ok(IngesterAddresses::None)
} else {
Ok(IngesterAddresses::ByShardIndex(map))
}
} else {
Ok(IngesterAddresses::None)
}
}
/// Return the querier config's ingester addresses. If `--shard-to-ingesters-file` is used to
/// specify a JSON file containing shard to ingester address mappings, this returns `Err` if
/// there are any problems reading, deserializing, or interpreting the file.
// When we have switched to using the RPC write path and remove the rpc_write feature, this
// method can be changed to be infallible as clap will handle failure to parse the list of
// strings.
// When we have switched to using the RPC write path only, this method can be changed to be
// infallible as clap will handle failure to parse the list of strings.
//
// For now, to enable turning on the `rpc_write` feature in tests but not necessarily switching
// into the RPC write path mode, require *both* the feature flag to be enabled *and*
// `--ingester-addresses` to be set in order to switch. If the `rpc_write` feature is enabled
// and `--shard-to-ingesters*` are set, use the write buffer path instead.
#[cfg(feature = "rpc_write")]
// Switching into the RPC write path mode requires *both* the `INFLUXDB_IOX_MODE` environment
// variable to be specified *and* `--ingester-addresses` to be set in order to switch. If the
// `INFLUXDB_IOX_MODE` is enabled and `--shard-to-ingesters*` is set, use the write buffer path
// instead.
pub fn ingester_addresses(&self) -> Result<IngesterAddresses, Error> {
if let Some(file) = &self.shard_to_ingesters_file {
let contents =
@ -354,18 +325,6 @@ impl QuerierConfig {
pub fn max_concurrent_queries(&self) -> usize {
self.max_concurrent_queries
}
/// Whether the querier is contacting ingesters that use the RPC write path or not.
#[cfg(feature = "rpc_write")]
pub fn rpc_write(&self) -> bool {
true
}
/// Whether the querier is contacting ingesters that use the RPC write path or not.
#[cfg(not(feature = "rpc_write"))]
pub fn rpc_write(&self) -> bool {
false
}
}
fn deserialize_shard_ingester_map(

View File

@ -106,7 +106,3 @@ jemalloc_replacing_malloc = ["tikv-jemalloc-sys", "tikv-jemalloc-ctl"]
# Implicit feature selected when running under `clippy --all-features` to accept mutable exclusive features during
# linting
clippy = []
# Temporary feature to use the RPC write path instead of the write buffer during the transition
# away from using Kafka.
rpc_write = ["ioxd_router/rpc_write", "clap_blocks/rpc_write", "test_helpers_end_to_end/rpc_write"]

View File

@ -465,8 +465,7 @@ impl Config {
num_query_threads: None, // will be ignored
shard_to_ingesters_file: None, // will be ignored
shard_to_ingesters: None, // will be ignored
#[cfg(feature = "rpc_write")]
ingester_addresses: vec![], // will be ignored
ingester_addresses: vec![], // will be ignored
ram_pool_metadata_bytes: querier_ram_pool_metadata_bytes,
ram_pool_data_bytes: querier_ram_pool_data_bytes,
max_concurrent_queries: querier_max_concurrent_queries,
@ -620,6 +619,7 @@ pub async fn command(config: Config) -> Result<()> {
time_provider,
ingester_addresses,
querier_config,
rpc_write: false,
})
.await?;

View File

@ -5,12 +5,10 @@ pub(crate) mod all_in_one;
mod compactor;
mod garbage_collector;
mod ingester;
#[cfg(feature = "rpc_write")]
mod ingester2;
mod main;
mod querier;
mod router;
#[cfg(feature = "rpc_write")]
mod router_rpc_write;
mod test;
@ -29,14 +27,12 @@ pub enum Error {
#[snafu(display("Error in router subcommand: {}", source))]
RouterError { source: router::Error },
#[cfg(feature = "rpc_write")]
#[snafu(display("Error in router-rpc-write subcommand: {}", source))]
RouterRpcWriteError { source: router_rpc_write::Error },
#[snafu(display("Error in ingester subcommand: {}", source))]
IngesterError { source: ingester::Error },
#[cfg(feature = "rpc_write")]
#[snafu(display("Error in ingester2 subcommand: {}", source))]
Ingester2Error { source: ingester2::Error },
@ -67,10 +63,8 @@ impl Config {
Some(Command::GarbageCollector(config)) => config.run_config.logging_config(),
Some(Command::Querier(config)) => config.run_config.logging_config(),
Some(Command::Router(config)) => config.run_config.logging_config(),
#[cfg(feature = "rpc_write")]
Some(Command::RouterRpcWrite(config)) => config.run_config.logging_config(),
Some(Command::Ingester(config)) => config.run_config.logging_config(),
#[cfg(feature = "rpc_write")]
Some(Command::Ingester2(config)) => config.run_config.logging_config(),
Some(Command::AllInOne(config)) => &config.logging_config,
Some(Command::Test(config)) => config.run_config.logging_config(),
@ -90,14 +84,12 @@ enum Command {
Router(router::Config),
/// Run the server in router mode using the RPC write path.
#[cfg(feature = "rpc_write")]
RouterRpcWrite(router_rpc_write::Config),
/// Run the server in ingester mode
Ingester(ingester::Config),
/// Run the server in ingester2 mode
#[cfg(feature = "rpc_write")]
Ingester2(ingester2::Config),
/// Run the server in "all in one" mode (Default)
@ -123,12 +115,10 @@ pub async fn command(config: Config) -> Result<()> {
.context(GarbageCollectorSnafu),
Some(Command::Querier(config)) => querier::command(config).await.context(QuerierSnafu),
Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu),
#[cfg(feature = "rpc_write")]
Some(Command::RouterRpcWrite(config)) => router_rpc_write::command(config)
.await
.context(RouterRpcWriteSnafu),
Some(Command::Ingester(config)) => ingester::command(config).await.context(IngesterSnafu),
#[cfg(feature = "rpc_write")]
Some(Command::Ingester2(config)) => {
ingester2::command(config).await.context(Ingester2Snafu)
}

View File

@ -4,9 +4,7 @@ use crate::process_info::setup_metric_registry;
use super::main;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig,
object_store::make_object_store,
querier::{IngesterAddresses, QuerierConfig},
catalog_dsn::CatalogDsnConfig, object_store::make_object_store, querier::QuerierConfig,
run_config::RunConfig,
};
use iox_query::exec::Executor;
@ -98,14 +96,14 @@ pub async fn command(config: Config) -> Result<(), Error> {
let num_threads = num_query_threads.unwrap_or_else(num_cpus::get);
info!(%num_threads, "using specified number of threads per thread pool");
let ingester_addresses = config.querier_config.ingester_addresses()?;
if config.querier_config.rpc_write() && matches!(ingester_addresses, IngesterAddresses::List(_))
{
let rpc_write = std::env::var("INFLUXDB_IOX_MODE").is_ok();
if rpc_write {
info!("using the RPC write path");
} else {
info!("using the write buffer path");
}
let ingester_addresses = config.querier_config.ingester_addresses()?;
info!(?ingester_addresses, "using ingester addresses");
let exec = Arc::new(Executor::new(
@ -122,6 +120,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
time_provider,
ingester_addresses,
querier_config: config.querier_config,
rpc_write,
})
.await?;

View File

@ -10,7 +10,7 @@ use ioxd_common::{
server_type::{CommonServerState, CommonServerStateError},
Service,
};
use ioxd_router::create_router_grpc_write_server_type;
use ioxd_router::create_router2_server_type;
use object_store::DynObjectStore;
use object_store_metrics::ObjectStoreMetrics;
use observability_deps::tracing::*;
@ -70,7 +70,7 @@ pub async fn command(config: Config) -> Result<()> {
let catalog = config
.catalog_dsn
.get_catalog("router_rpc_write", Arc::clone(&metrics))
.get_catalog("router2", Arc::clone(&metrics))
.await?;
let object_store = make_object_store(config.run_config.object_store_config())
@ -82,7 +82,7 @@ pub async fn command(config: Config) -> Result<()> {
&metrics,
));
let server_type = create_router_grpc_write_server_type(
let server_type = create_router2_server_type(
&common_state,
Arc::clone(&metrics),
catalog,
@ -91,7 +91,7 @@ pub async fn command(config: Config) -> Result<()> {
)
.await?;
info!("starting router_rpc_write");
info!("starting router2");
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(main::main(common_state, services, metrics).await?)
}

View File

@ -89,7 +89,6 @@ async fn ingester_flight_api() {
});
}
#[cfg(feature = "rpc_write")]
#[tokio::test]
async fn ingester2_flight_api() {
test_helpers::maybe_start_logging();
@ -98,7 +97,7 @@ async fn ingester2_flight_api() {
let table_name = "mytable";
// Set up cluster
let mut cluster = MiniCluster::create_non_shared_rpc_write(database_url).await;
let mut cluster = MiniCluster::create_non_shared2(database_url).await;
// Write some data into the v2 HTTP API ==============
let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name);

View File

@ -150,6 +150,7 @@ pub struct QuerierServerTypeArgs<'a> {
pub time_provider: Arc<dyn TimeProvider>,
pub ingester_addresses: IngesterAddresses,
pub querier_config: QuerierConfig,
pub rpc_write: bool,
}
#[derive(Debug, Error)]
@ -186,23 +187,36 @@ pub async fn create_querier_server_type(
);
assert!(existing.is_none());
let rpc_write = args.querier_config.rpc_write()
&& matches!(args.ingester_addresses, IngesterAddresses::List(_));
let ingester_connection = match args.ingester_addresses {
IngesterAddresses::None => None,
IngesterAddresses::ByShardIndex(map) => Some(create_ingester_connections(
Some(map),
None,
Arc::clone(&catalog_cache),
args.querier_config.ingester_circuit_breaker_threshold,
)),
IngesterAddresses::List(list) => Some(create_ingester_connections(
None,
Some(list),
Arc::clone(&catalog_cache),
args.querier_config.ingester_circuit_breaker_threshold,
)),
IngesterAddresses::ByShardIndex(map) => {
if args.rpc_write {
panic!(
"`INFLUXDB_IOX_MODE` is set but shard to ingester mappings were provided; \
either unset `INFLUXDB_IOX_MODE` or specify `--ingester-addresses` instead"
);
}
Some(create_ingester_connections(
Some(map),
None,
Arc::clone(&catalog_cache),
args.querier_config.ingester_circuit_breaker_threshold,
))
}
IngesterAddresses::List(list) => {
if !args.rpc_write {
panic!(
"`INFLUXDB_IOX_MODE` is unset but ingester addresses were provided; \
either set `INFLUXDB_IOX_MODE` or specify shard to ingester mappings instead"
);
}
Some(create_ingester_connections(
None,
Some(list),
Arc::clone(&catalog_cache),
args.querier_config.ingester_circuit_breaker_threshold,
))
}
};
let database = Arc::new(
@ -212,7 +226,7 @@ pub async fn create_querier_server_type(
args.exec,
ingester_connection,
args.querier_config.max_concurrent_queries(),
rpc_write,
args.rpc_write,
)
.await?,
);

View File

@ -29,9 +29,3 @@ thiserror = "1.0.37"
tokio = { version = "1.22", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-util = { version = "0.7.4" }
workspace-hack = { path = "../workspace-hack"}
[features]
# Temporary feature to use the RPC write path instead of the write buffer during the transition
# away from using Kafka.
rpc_write = []

View File

@ -248,7 +248,7 @@ impl HttpApiErrorSource for IoxHttpErrorAdaptor {
// NOTE!!! This needs to be kept in sync with `create_router_server_type` until the
// switch to the RPC write path/ingester2 is complete! See the numbered sections that annotate
// where these two functions line up and where they diverge.
pub async fn create_router_grpc_write_server_type(
pub async fn create_router2_server_type(
common_state: &CommonServerState,
metrics: Arc<metric::Registry>,
catalog: Arc<dyn Catalog>,
@ -423,7 +423,7 @@ pub async fn create_router_grpc_write_server_type(
}
/// Instantiate a router server
// NOTE!!! This needs to be kept in sync with `create_router_grpc_write_server_type` until the
// NOTE!!! This needs to be kept in sync with `create_router2_server_type` until the
// switch to the RPC write path/ingester2 is complete! See the numbered sections that annotate
// where these two functions line up and where they diverge.
pub async fn create_router_server_type(
@ -435,7 +435,7 @@ pub async fn create_router_server_type(
router_config: &RouterConfig,
) -> Result<Arc<dyn ServerType>> {
// 1. START: Different Setup Per Router Path: this part is only relevant to using a write
// buffer and should not be added to `create_router_grpc_write_server_type`.
// buffer and should not be added to `create_router2_server_type`.
// Initialise the sharded write buffer and instrument it with DML handler metrics.
let (write_buffer, sharder) = init_write_buffer(

View File

@ -11,7 +11,6 @@ arrow_util = { path = "../arrow_util" }
assert_cmd = "2.0.7"
bytes = "1.3"
data_types = { path = "../data_types" }
escargot = "0.5"
futures = "0.3"
generated_types = { path = "../generated_types" }
http = "0.2.8"
@ -31,8 +30,3 @@ tokio = { version = "1.22", features = ["macros", "net", "parking_lot", "rt-mult
tokio-util = "0.7"
tonic = "0.8"
workspace-hack = { path = "../workspace-hack"}
[features]
# Temporary feature to use the RPC write path instead of the write buffer during the transition
# away from using Kafka.
rpc_write = []

View File

@ -65,16 +65,17 @@ impl TestConfig {
.with_new_object_store()
}
/// Create a minimal router configuration sharing configuration with the ingester config
pub fn new_router_rpc_write(ingester_config: &TestConfig) -> Self {
assert_eq!(ingester_config.server_type(), ServerType::IngesterRpcWrite);
/// Create a minimal router2 configuration sharing configuration with the ingester2 config
pub fn new_router2(ingester_config: &TestConfig) -> Self {
assert_eq!(ingester_config.server_type(), ServerType::Ingester2);
Self::new(
ServerType::RouterRpcWrite,
ServerType::Router2,
ingester_config.dsn().to_owned(),
ingester_config.catalog_schema_name(),
)
.with_existing_object_store(ingester_config)
.with_env("INFLUXDB_IOX_MODE", "2")
.with_env(
"INFLUXDB_IOX_INGESTER_ADDRESSES",
ingester_config
@ -98,17 +99,13 @@ impl TestConfig {
.with_default_ingester_options()
}
/// Create a minimal ingester configuration, using the dsn configuration from other
pub fn new_ingester_rpc_write(dsn: impl Into<String>) -> Self {
/// Create a minimal ingester2 configuration, using the dsn configuration specified
pub fn new_ingester2(dsn: impl Into<String>) -> Self {
let dsn = Some(dsn.into());
Self::new(
ServerType::IngesterRpcWrite,
dsn,
random_catalog_schema_name(),
)
.with_new_object_store()
.with_new_wal()
.with_default_ingester_options()
Self::new(ServerType::Ingester2, dsn, random_catalog_schema_name())
.with_new_object_store()
.with_new_wal()
.with_default_ingester_options()
}
/// Create a minimal querier configuration from the specified

View File

@ -152,9 +152,9 @@ impl MiniCluster {
.with_compactor_config(compactor_config)
}
pub async fn create_non_shared_rpc_write(database_url: String) -> Self {
let ingester_config = TestConfig::new_ingester_rpc_write(&database_url);
let router_config = TestConfig::new_router_rpc_write(&ingester_config);
pub async fn create_non_shared2(database_url: String) -> Self {
let ingester_config = TestConfig::new_ingester2(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
// Set up the cluster ====================================
Self::new()

View File

@ -1,3 +1,4 @@
use assert_cmd::cargo::CommandCargoExt;
use futures::prelude::*;
use influxdb_iox_client::connection::Connection;
use observability_deps::tracing::{info, warn};
@ -6,7 +7,7 @@ use std::{
fs::OpenOptions,
ops::DerefMut,
path::Path,
process::Child,
process::{Child, Command},
str,
sync::{Arc, Weak},
time::Duration,
@ -184,7 +185,7 @@ impl Connections {
let server_type = test_config.server_type();
self.router_grpc_connection = match server_type {
ServerType::AllInOne | ServerType::Router | ServerType::RouterRpcWrite => {
ServerType::AllInOne | ServerType::Router | ServerType::Router2 => {
let client_base = test_config.addrs().router_grpc_api().client_base();
Some(
grpc_channel(test_config, client_base.as_ref())
@ -198,7 +199,7 @@ impl Connections {
};
self.ingester_grpc_connection = match server_type {
ServerType::AllInOne | ServerType::Ingester | ServerType::IngesterRpcWrite => {
ServerType::AllInOne | ServerType::Ingester | ServerType::Ingester2 => {
let client_base = test_config.addrs().ingester_grpc_api().client_base();
Some(
grpc_channel(test_config, client_base.as_ref())
@ -336,7 +337,7 @@ impl TestServer {
let run_command_name = server_type.run_command();
let mut command = cargo_run_command();
let mut command = Command::cargo_bin("influxdb_iox").unwrap();
let mut command = command
.arg("run")
.arg(run_command_name)
@ -488,7 +489,7 @@ impl TestServer {
`influxdb_iox compactor run-once` instead"
);
}
ServerType::Router | ServerType::RouterRpcWrite => {
ServerType::Router | ServerType::Router2 => {
if check_catalog_service_health(
server_type,
connections.router_grpc_connection(),
@ -498,7 +499,7 @@ impl TestServer {
return;
}
}
ServerType::Ingester | ServerType::IngesterRpcWrite => {
ServerType::Ingester | ServerType::Ingester2 => {
if check_arrow_service_health(
server_type,
connections.ingester_grpc_connection(),
@ -546,30 +547,6 @@ impl TestServer {
}
}
// Build the command, with the `rpc_write` feature enabled to allow testing of the RPC
// write path.
// This will inherit environment from the test runner, in particular, `LOG_FILTER`
#[cfg(feature = "rpc_write")]
fn cargo_run_command() -> std::process::Command {
escargot::CargoBuild::new()
.bin("influxdb_iox")
.features("rpc_write")
.run()
.unwrap()
.command()
}
// Build the command, WITHOUT the `rpc_write` feature enabled, to not clobber the build.
// This will inherit environment from the test runner, in particular, `LOG_FILTER`
#[cfg(not(feature = "rpc_write"))]
fn cargo_run_command() -> std::process::Command {
escargot::CargoBuild::new()
.bin("influxdb_iox")
.run()
.unwrap()
.command()
}
/// checks catalog service health, as a proxy for all gRPC
/// services. Returns false if the service should be checked again
async fn check_catalog_service_health(server_type: ServerType, connection: Connection) -> bool {

View File

@ -4,9 +4,9 @@ use super::addrs::BindAddresses;
pub enum ServerType {
AllInOne,
Ingester,
IngesterRpcWrite,
Ingester2,
Router,
RouterRpcWrite,
Router2,
Querier,
Compactor,
}
@ -17,9 +17,9 @@ impl ServerType {
match self {
Self::AllInOne => "all-in-one",
Self::Ingester => "ingester",
Self::IngesterRpcWrite => "ingester2",
Self::Ingester2 => "ingester2",
Self::Router => "router",
Self::RouterRpcWrite => "router-rpc-write",
Self::Router2 => "router2",
Self::Querier => "querier",
Self::Compactor => "compactor",
}
@ -77,7 +77,7 @@ fn addr_envs(server_type: ServerType, addrs: &BindAddresses) -> Vec<(&'static st
addrs.ingester_grpc_api().bind_addr().to_string(),
),
],
ServerType::IngesterRpcWrite => vec![
ServerType::Ingester2 => vec![
(
"INFLUXDB_IOX_BIND_ADDR",
addrs.router_http_api().bind_addr().to_string(),
@ -86,6 +86,7 @@ fn addr_envs(server_type: ServerType, addrs: &BindAddresses) -> Vec<(&'static st
"INFLUXDB_IOX_GRPC_BIND_ADDR",
addrs.ingester_grpc_api().bind_addr().to_string(),
),
("INFLUXDB_IOX_MODE", "2".to_string()),
],
ServerType::Router => vec![
(
@ -97,7 +98,7 @@ fn addr_envs(server_type: ServerType, addrs: &BindAddresses) -> Vec<(&'static st
addrs.router_grpc_api().bind_addr().to_string(),
),
],
ServerType::RouterRpcWrite => vec![
ServerType::Router2 => vec![
(
"INFLUXDB_IOX_BIND_ADDR",
addrs.router_http_api().bind_addr().to_string(),
@ -110,6 +111,7 @@ fn addr_envs(server_type: ServerType, addrs: &BindAddresses) -> Vec<(&'static st
"INFLUXDB_IOX_INGESTER_ADDRESSES",
addrs.ingester_grpc_api().bind_addr().to_string(),
),
("INFLUXDB_IOX_MODE", "2".to_string()),
],
ServerType::Querier => vec![
(