diff --git a/clap_blocks/Cargo.toml b/clap_blocks/Cargo.toml index 0f97e01cdb..7fa3508599 100644 --- a/clap_blocks/Cargo.toml +++ b/clap_blocks/Cargo.toml @@ -33,3 +33,7 @@ test_helpers = { path = "../test_helpers" } azure = ["object_store/azure"] # Optional Azure Object store support gcp = ["object_store/gcp"] # Optional GCP object store support aws = ["object_store/aws"] # Optional AWS / S3 object store support + +# Temporary feature to use the RPC write path instead of the write buffer during the transition +# away from using Kafka. +rpc_write = [] diff --git a/clap_blocks/src/querier.rs b/clap_blocks/src/querier.rs index 4b848f708f..2d5ec0c777 100644 --- a/clap_blocks/src/querier.rs +++ b/clap_blocks/src/querier.rs @@ -1,8 +1,14 @@ //! Querier-related configs. use data_types::{IngesterMapping, ShardIndex}; +use snafu::Snafu; +use std::{collections::HashMap, io, path::PathBuf, sync::Arc}; + +#[cfg(not(feature = "rpc_write"))] use serde::Deserialize; -use snafu::{ResultExt, Snafu}; -use std::{collections::HashMap, fs, io, path::PathBuf, sync::Arc}; +#[cfg(not(feature = "rpc_write"))] +use snafu::ResultExt; +#[cfg(not(feature = "rpc_write"))] +use std::fs; #[derive(Debug, Snafu)] #[allow(missing_docs)] @@ -19,6 +25,7 @@ pub enum Error { ingesters, shards, ))] + #[cfg(not(feature = "rpc_write"))] IgnoreAllRequiresEmptyConfig { ingesters: HashMap, Arc>, shards: HashMap, @@ -130,6 +137,7 @@ pub struct QuerierConfig { env = "INFLUXDB_IOX_SHARD_TO_INGESTERS_FILE", action )] + #[cfg(not(feature = "rpc_write"))] pub shard_to_ingesters_file: Option, /// JSON containing a Shard index to ingesters gRPC mapping. For example: @@ -199,8 +207,27 @@ pub struct QuerierConfig { env = "INFLUXDB_IOX_SHARD_TO_INGESTERS", action )] + #[cfg(not(feature = "rpc_write"))] pub shard_to_ingesters: Option, + /// gRPC address for the router to talk with the ingesters. For + /// example: + /// + /// "http://127.0.0.1:8083" + /// + /// or + /// + /// "http://10.10.10.1:8083,http://10.10.10.2:8083" + /// + /// for multiple addresses. + #[clap( + long = "ingester-addresses", + env = "INFLUXDB_IOX_INGESTER_ADDRESSES", + required = true + )] + #[cfg(feature = "rpc_write")] + pub ingester_addresses: Vec, + /// Size of the RAM cache used to store catalog metadata information in bytes. #[clap( long = "ram-pool-metadata-bytes", @@ -261,6 +288,7 @@ impl QuerierConfig { /// Return the querier config's ingester addresses. If `--shard-to-ingesters-file` is used to /// specify a JSON file containing shard to ingester address mappings, this returns `Err` if /// there are any problems reading, deserializing, or interpreting the file. + #[cfg(not(feature = "rpc_write"))] pub fn ingester_addresses(&self) -> Result { if let Some(file) = &self.shard_to_ingesters_file { let contents = @@ -283,6 +311,24 @@ impl QuerierConfig { } } + /// Return the querier config's ingester addresses. + // When we have switched to using the RPC write path and remove the rpc_write feature, this + // method can be changed to be infallible as clap will handle failure to parse the list of + // strings. + #[cfg(feature = "rpc_write")] + pub fn ingester_addresses(&self) -> Result { + if self.ingester_addresses.is_empty() { + Ok(IngesterAddresses::None) + } else { + Ok(IngesterAddresses::List( + self.ingester_addresses + .iter() + .map(|s| s.as_str().into()) + .collect(), + )) + } + } + /// Size of the RAM cache pool for metadata in bytes. pub fn ram_pool_metadata_bytes(&self) -> usize { self.ram_pool_metadata_bytes @@ -297,8 +343,21 @@ impl QuerierConfig { pub fn max_concurrent_queries(&self) -> usize { self.max_concurrent_queries } + + /// Whether the querier is contacting ingesters that use the RPC write path or not. + #[cfg(feature = "rpc_write")] + pub fn rpc_write(&self) -> bool { + true + } + + /// Whether the querier is contacting ingesters that use the RPC write path or not. + #[cfg(not(feature = "rpc_write"))] + pub fn rpc_write(&self) -> bool { + false + } } +#[cfg(not(feature = "rpc_write"))] fn deserialize_shard_ingester_map( contents: &str, ) -> Result, Error> { @@ -375,12 +434,16 @@ pub enum IngesterAddresses { /// A mapping from shard index to ingesters. ByShardIndex(HashMap), + /// A list of ingester2 addresses. + List(Vec>), + /// No connections, meaning only persisted data should be used. None, } #[derive(Debug, Deserialize, Default)] #[serde(rename_all = "camelCase")] +#[cfg(not(feature = "rpc_write"))] struct IngestersConfig { #[serde(default)] ignore_all: bool, @@ -392,6 +455,7 @@ struct IngestersConfig { /// Ingester config. #[derive(Debug, Deserialize)] +#[cfg(not(feature = "rpc_write"))] pub struct IngesterConfig { addr: Option>, #[serde(default)] @@ -400,6 +464,7 @@ pub struct IngesterConfig { /// Shard config. #[derive(Debug, Deserialize)] +#[cfg(not(feature = "rpc_write"))] pub struct ShardConfig { ingester: Option>, #[serde(default)] @@ -407,6 +472,7 @@ pub struct ShardConfig { } #[cfg(test)] +#[cfg(not(feature = "rpc_write"))] // These tests won't be relevant after the switch to rpc_write. mod tests { use super::*; use clap::Parser; diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index ece35afb6e..fcb62da14a 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -109,4 +109,4 @@ clippy = [] # Temporary feature to use the RPC write path instead of the write buffer during the transition # away from using Kafka. -rpc_write = ["ioxd_router/rpc_write"] +rpc_write = ["ioxd_router/rpc_write", "clap_blocks/rpc_write"] diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index a605a63d3f..37cdab365a 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -440,9 +440,13 @@ impl Config { }; let querier_config = QuerierConfig { - num_query_threads: None, // will be ignored + num_query_threads: None, // will be ignored + #[cfg(not(feature = "rpc_write"))] shard_to_ingesters_file: None, // will be ignored - shard_to_ingesters: None, // will be ignored + #[cfg(not(feature = "rpc_write"))] + shard_to_ingesters: None, // will be ignored + #[cfg(feature = "rpc_write")] + ingester_addresses: vec![], // will be ignored ram_pool_metadata_bytes: querier_ram_pool_metadata_bytes, ram_pool_data_bytes: querier_ram_pool_data_bytes, max_concurrent_queries: querier_max_concurrent_queries, diff --git a/influxdb_iox/src/commands/run/querier.rs b/influxdb_iox/src/commands/run/querier.rs index 6620cda8c8..e7c36963f2 100644 --- a/influxdb_iox/src/commands/run/querier.rs +++ b/influxdb_iox/src/commands/run/querier.rs @@ -96,6 +96,12 @@ pub async fn command(config: Config) -> Result<(), Error> { let num_threads = num_query_threads.unwrap_or_else(num_cpus::get); info!(%num_threads, "using specified number of threads per thread pool"); + if config.querier_config.rpc_write() { + info!("using the RPC write path"); + } else { + info!("using the write buffer path"); + } + let ingester_addresses = config.querier_config.ingester_addresses()?; info!(?ingester_addresses, "using ingester addresses"); diff --git a/ioxd_querier/src/lib.rs b/ioxd_querier/src/lib.rs index c833999293..1ff67af8d0 100644 --- a/ioxd_querier/src/lib.rs +++ b/ioxd_querier/src/lib.rs @@ -15,7 +15,7 @@ use ioxd_common::{ use metric::Registry; use object_store::DynObjectStore; use querier::{ - create_ingester_connections_by_shard, QuerierCatalogCache, QuerierDatabase, QuerierHandler, + create_ingester_connections, QuerierCatalogCache, QuerierDatabase, QuerierHandler, QuerierHandlerImpl, QuerierServer, }; use std::{ @@ -188,8 +188,15 @@ pub async fn create_querier_server_type( let ingester_connection = match args.ingester_addresses { IngesterAddresses::None => None, - IngesterAddresses::ByShardIndex(map) => Some(create_ingester_connections_by_shard( - map, + IngesterAddresses::ByShardIndex(map) => Some(create_ingester_connections( + Some(map), + None, + Arc::clone(&catalog_cache), + args.querier_config.ingester_circuit_breaker_threshold, + )), + IngesterAddresses::List(list) => Some(create_ingester_connections( + None, + Some(list), Arc::clone(&catalog_cache), args.querier_config.ingester_circuit_breaker_threshold, )), @@ -202,6 +209,7 @@ pub async fn create_querier_server_type( args.exec, ingester_connection, args.querier_config.max_concurrent_queries(), + args.querier_config.rpc_write(), ) .await?, ); diff --git a/ioxd_querier/src/rpc/namespace.rs b/ioxd_querier/src/rpc/namespace.rs index 268627d512..0667f64333 100644 --- a/ioxd_querier/src/rpc/namespace.rs +++ b/ioxd_querier/src/rpc/namespace.rs @@ -107,6 +107,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + false, ) .await .unwrap(), @@ -142,6 +143,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + false, ) .await .unwrap(), diff --git a/querier/src/database.rs b/querier/src/database.rs index 0f234c3ea4..59e708c01f 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -69,7 +69,8 @@ pub struct QuerierDatabase { query_execution_semaphore: Arc, /// Sharder to determine which ingesters to query for a particular table and namespace. - sharder: Arc>>, + /// Only relevant when using the write buffer; will be None if using RPC write ingesters. + sharder: Option>>>, /// Chunk prune metrics. prune_metrics: Arc, @@ -106,6 +107,7 @@ impl QuerierDatabase { exec: Arc, ingester_connection: Option>, max_concurrent_queries: usize, + rpc_write: bool, ) -> Result { assert!( max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX, @@ -128,9 +130,13 @@ impl QuerierDatabase { let query_execution_semaphore = Arc::new(semaphore_metrics.new_semaphore(max_concurrent_queries)); - let sharder = Arc::new( - create_sharder(catalog_cache.catalog().as_ref(), backoff_config.clone()).await?, - ); + let sharder = if rpc_write { + None + } else { + Some(Arc::new( + create_sharder(catalog_cache.catalog().as_ref(), backoff_config.clone()).await?, + )) + }; let prune_metrics = Arc::new(PruneMetrics::new(&metric_registry)); @@ -172,7 +178,7 @@ impl QuerierDatabase { Arc::clone(&self.exec), self.ingester_connection.clone(), Arc::clone(&self.query_log), - Arc::clone(&self.sharder), + self.sharder.clone(), Arc::clone(&self.prune_metrics), ))) } @@ -256,6 +262,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX.saturating_add(1), + false, ) .await .unwrap(); @@ -280,6 +287,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + false, ) .await, Error::NoShards @@ -305,6 +313,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + false, ) .await .unwrap(); @@ -334,6 +343,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + false, ) .await .unwrap(); diff --git a/querier/src/handler.rs b/querier/src/handler.rs index d22d9f64a2..5de83819ed 100644 --- a/querier/src/handler.rs +++ b/querier/src/handler.rs @@ -224,6 +224,7 @@ mod tests { exec, Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + false, ) .await .unwrap(), diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index a2f8efdd34..97cee9ca44 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -139,9 +139,10 @@ pub enum Error { pub type Result = std::result::Result; -/// Create a new set of connections given a map of shard indexes to Ingester configurations -pub fn create_ingester_connections_by_shard( - shard_to_ingesters: HashMap, +/// Create a new set of connections given ingester configurations +pub fn create_ingester_connections( + shard_to_ingesters: Option>, + ingester_addresses: Option>>, catalog_cache: Arc, open_circuit_after_n_errors: u64, ) -> Arc { @@ -161,13 +162,29 @@ pub fn create_ingester_connections_by_shard( deadline: None, }; - Arc::new(IngesterConnectionImpl::by_shard( - shard_to_ingesters, - catalog_cache, - retry_backoff_config, - circuit_breaker_backoff_config, - open_circuit_after_n_errors, - )) + // Exactly one of `shard_to_ingesters` or `ingester_addreses` must be specified. + // `shard_to_ingesters` uses the Kafka write buffer path. + // `ingester_addresses` uses the RPC write path. + match (shard_to_ingesters, ingester_addresses) { + (None, None) => panic!("Neither shard_to_ingesters nor ingester_addresses was specified!"), + (Some(_), Some(_)) => { + panic!("Both shard_to_ingesters and ingester_addresses were specified!") + } + (Some(shard_to_ingesters), None) => Arc::new(IngesterConnectionImpl::by_shard( + shard_to_ingesters, + catalog_cache, + retry_backoff_config, + circuit_breaker_backoff_config, + open_circuit_after_n_errors, + )), + (None, Some(ingester_addresses)) => Arc::new(IngesterConnectionImpl::by_addrs( + ingester_addresses, + catalog_cache, + retry_backoff_config, + circuit_breaker_backoff_config, + open_circuit_after_n_errors, + )), + } } /// Create a new ingester suitable for testing @@ -187,7 +204,7 @@ pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static { #[allow(clippy::too_many_arguments)] async fn partitions( &self, - shard_indexes: &[ShardIndex], + shard_indexes: Option>, namespace_id: NamespaceId, table_id: TableId, columns: Vec, @@ -404,6 +421,36 @@ impl IngesterConnectionImpl { backoff_config, } } + + /// Create a new set of connections given a list of ingester2 addresses. + pub fn by_addrs( + ingester_addresses: Vec>, + catalog_cache: Arc, + backoff_config: BackoffConfig, + circuit_breaker_backoff_config: BackoffConfig, + open_circuit_after_n_errors: u64, + ) -> Self { + let flight_client = Arc::new(FlightClientImpl::new()); + let flight_client = Arc::new(CircuitBreakerFlightClient::new( + flight_client, + catalog_cache.time_provider(), + catalog_cache.metric_registry(), + open_circuit_after_n_errors, + circuit_breaker_backoff_config, + )); + + let metric_registry = catalog_cache.metric_registry(); + let metrics = Arc::new(IngesterConnectionMetrics::new(&metric_registry)); + + Self { + shard_to_ingesters: HashMap::new(), + unique_ingester_addresses: ingester_addresses.into_iter().collect(), + flight_client, + catalog_cache, + metrics, + backoff_config, + } + } } /// Struct that names all parameters to `execute` @@ -726,7 +773,7 @@ impl IngesterConnection for IngesterConnectionImpl { /// Retrieve chunks from the ingester for the particular table, shard, and predicate async fn partitions( &self, - shard_indexes: &[ShardIndex], + shard_indexes: Option>, namespace_id: NamespaceId, table_id: TableId, columns: Vec, @@ -734,12 +781,49 @@ impl IngesterConnection for IngesterConnectionImpl { expected_schema: Arc, span: Option, ) -> Result> { - // If no shard indexes are specified, no ingester addresses can be found. This is a - // configuration problem somewhere. - assert!( - !shard_indexes.is_empty(), - "Called `IngesterConnection.partitions` with an empty `shard_indexes` list", - ); + let relevant_ingester_addresses = match shard_indexes { + // If shard indexes is None, we're using the RPC write path, and all ingesters should + // be queried. + None => self.unique_ingester_addresses.clone(), + // If shard indexes is Some([]), no ingester addresses can be found. This is a + // configuration problem somewhwere. + Some(shard_indexes) if shard_indexes.is_empty() => { + panic!("Called `IngesterConnection.partitions` with an empty `shard_indexes` list"); + } + // Otherwise, we're using the write buffer and need to look up the ingesters to contact + // by their shard index. + Some(shard_indexes) => { + // Look up the ingesters needed for the shard. Collect into a HashSet to avoid making + // multiple requests to the same ingester if that ingester is responsible for multiple + // shard_indexes relevant to this query. + let mut relevant_ingester_addresses = HashSet::new(); + + for shard_index in &shard_indexes { + match self.shard_to_ingesters.get(shard_index) { + None => { + return NoIngesterFoundForShardSnafu { + shard_index: *shard_index, + } + .fail() + } + Some(mapping) => match mapping { + IngesterMapping::Addr(addr) => { + relevant_ingester_addresses.insert(Arc::clone(addr)); + } + IngesterMapping::Ignore => (), + IngesterMapping::NotMapped => { + return ShardNotMappedSnafu { + shard_index: *shard_index, + } + .fail() + } + }, + } + } + relevant_ingester_addresses + } + }; + let mut span_recorder = SpanRecorder::new(span); let metrics = Arc::clone(&self.metrics); @@ -797,34 +881,6 @@ impl IngesterConnection for IngesterConnectionImpl { } }; - // Look up the ingesters needed for the shard. Collect into a HashSet to avoid making - // multiple requests to the same ingester if that ingester is responsible for multiple - // shard_indexes relevant to this query. - let mut relevant_ingester_addresses = HashSet::new(); - - for shard_index in shard_indexes { - match self.shard_to_ingesters.get(shard_index) { - None => { - return NoIngesterFoundForShardSnafu { - shard_index: *shard_index, - } - .fail() - } - Some(mapping) => match mapping { - IngesterMapping::Addr(addr) => { - relevant_ingester_addresses.insert(Arc::clone(addr)); - } - IngesterMapping::Ignore => (), - IngesterMapping::NotMapped => { - return ShardNotMappedSnafu { - shard_index: *shard_index, - } - .fail() - } - }, - } - } - let mut ingester_partitions: Vec = relevant_ingester_addresses .into_iter() .map(move |ingester_address| measured_ingester_request(ingester_address)) @@ -1779,7 +1835,7 @@ mod tests { let shard_indexes: Vec<_> = shard_indexes.iter().copied().map(ShardIndex::new).collect(); ingester_conn .partitions( - &shard_indexes, + Some(shard_indexes), NamespaceId::new(1), TableId::new(2), columns, diff --git a/querier/src/ingester/test_util.rs b/querier/src/ingester/test_util.rs index 5eeacaa2a1..1213b72487 100644 --- a/querier/src/ingester/test_util.rs +++ b/querier/src/ingester/test_util.rs @@ -34,7 +34,7 @@ impl MockIngesterConnection { impl IngesterConnection for MockIngesterConnection { async fn partitions( &self, - _shard_indexes: &[ShardIndex], + _shard_indexes: Option>, _namespace_id: NamespaceId, _table_id: TableId, columns: Vec, diff --git a/querier/src/lib.rs b/querier/src/lib.rs index 02bd1ae999..ea6c973598 100644 --- a/querier/src/lib.rs +++ b/querier/src/lib.rs @@ -28,7 +28,7 @@ pub use cache::CatalogCache as QuerierCatalogCache; pub use database::{Error as QuerierDatabaseError, QuerierDatabase}; pub use handler::{QuerierHandler, QuerierHandlerImpl}; pub use ingester::{ - create_ingester_connection_for_testing, create_ingester_connections_by_shard, + create_ingester_connection_for_testing, create_ingester_connections, flight_client::{ Error as IngesterFlightClientError, FlightClient as IngesterFlightClient, QueryData as IngesterFlightClientQueryData, diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index bb308a8cdc..9947d8f6a1 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -58,7 +58,7 @@ impl QuerierNamespace { exec: Arc, ingester_connection: Option>, query_log: Arc, - sharder: Arc>>, + sharder: Option>>>, prune_metrics: Arc, ) -> Self { let tables: HashMap<_, _> = ns @@ -66,7 +66,7 @@ impl QuerierNamespace { .iter() .map(|(table_name, cached_table)| { let table = Arc::new(QuerierTable::new(QuerierTableArgs { - sharder: Arc::clone(&sharder), + sharder: sharder.clone(), namespace_id: ns.id, namespace_name: Arc::clone(&name), namespace_retention_period: ns.retention_period, @@ -118,7 +118,7 @@ impl QuerierNamespace { exec, ingester_connection, query_log, - sharder, + Some(sharder), prune_metrics, ) } diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 10799af751..4443aa00b2 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -78,7 +78,7 @@ impl From for DataFusionError { /// Args to create a [`QuerierTable`]. pub struct QuerierTableArgs { - pub sharder: Arc>>, + pub sharder: Option>>>, pub namespace_id: NamespaceId, pub namespace_name: Arc, pub namespace_retention_period: Option, @@ -94,8 +94,9 @@ pub struct QuerierTableArgs { /// Table representation for the querier. #[derive(Debug)] pub struct QuerierTable { - /// Sharder to query for which shards are responsible for the table's data - sharder: Arc>>, + /// Sharder to query for which shards are responsible for the table's data. If not specified, + /// query all ingesters. + sharder: Option>>>, /// Namespace the table is in namespace_name: Arc, @@ -481,14 +482,15 @@ impl QuerierTable { // determine which ingester(s) to query. // Currently, the sharder will only return one shard index per table, but in the // near future, the sharder might return more than one shard index for one table. - let shard_indexes = vec![**self + let shard_indexes = self .sharder - .shard_for_query(&self.table_name, &self.namespace_name)]; + .as_ref() + .map(|sharder| vec![**sharder.shard_for_query(&self.table_name, &self.namespace_name)]); // get any chunks from the ingester(s) let partitions_result = ingester_connection .partitions( - &shard_indexes, + shard_indexes, self.namespace_id, self.table_id, columns, diff --git a/querier/src/table/test_util.rs b/querier/src/table/test_util.rs index 4db57d78b3..c6532741c1 100644 --- a/querier/src/table/test_util.rs +++ b/querier/src/table/test_util.rs @@ -39,7 +39,9 @@ pub async fn querier_table(catalog: &Arc, table: &Arc) - .retention_period_ns .map(|retention| Duration::from_nanos(retention as u64)); QuerierTable::new(QuerierTableArgs { - sharder: Arc::new(JumpHash::new((0..1).map(ShardIndex::new).map(Arc::new))), + sharder: Some(Arc::new(JumpHash::new( + (0..1).map(ShardIndex::new).map(Arc::new), + ))), namespace_id: table.namespace.namespace.id, namespace_name, namespace_retention_period,