Merge pull request #6337 from influxdata/cn/ingester2-querier

feat: Make a mode for the querier to use ingester2 instead, behind the rpc_write feature flag
pull/24376/head
kodiakhq[bot] 2022-12-08 14:07:36 +00:00 committed by GitHub
commit 64aae97ce7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 233 additions and 72 deletions

View File

@ -33,3 +33,7 @@ test_helpers = { path = "../test_helpers" }
azure = ["object_store/azure"] # Optional Azure Object store support
gcp = ["object_store/gcp"] # Optional GCP object store support
aws = ["object_store/aws"] # Optional AWS / S3 object store support
# Temporary feature to use the RPC write path instead of the write buffer during the transition
# away from using Kafka.
rpc_write = []

View File

@ -1,8 +1,14 @@
//! Querier-related configs.
use data_types::{IngesterMapping, ShardIndex};
use snafu::Snafu;
use std::{collections::HashMap, io, path::PathBuf, sync::Arc};
#[cfg(not(feature = "rpc_write"))]
use serde::Deserialize;
use snafu::{ResultExt, Snafu};
use std::{collections::HashMap, fs, io, path::PathBuf, sync::Arc};
#[cfg(not(feature = "rpc_write"))]
use snafu::ResultExt;
#[cfg(not(feature = "rpc_write"))]
use std::fs;
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
@ -19,6 +25,7 @@ pub enum Error {
ingesters,
shards,
))]
#[cfg(not(feature = "rpc_write"))]
IgnoreAllRequiresEmptyConfig {
ingesters: HashMap<Arc<str>, Arc<IngesterConfig>>,
shards: HashMap<ShardIndex, ShardConfig>,
@ -130,6 +137,7 @@ pub struct QuerierConfig {
env = "INFLUXDB_IOX_SHARD_TO_INGESTERS_FILE",
action
)]
#[cfg(not(feature = "rpc_write"))]
pub shard_to_ingesters_file: Option<PathBuf>,
/// JSON containing a Shard index to ingesters gRPC mapping. For example:
@ -199,8 +207,27 @@ pub struct QuerierConfig {
env = "INFLUXDB_IOX_SHARD_TO_INGESTERS",
action
)]
#[cfg(not(feature = "rpc_write"))]
pub shard_to_ingesters: Option<String>,
/// gRPC address for the router to talk with the ingesters. For
/// example:
///
/// "http://127.0.0.1:8083"
///
/// or
///
/// "http://10.10.10.1:8083,http://10.10.10.2:8083"
///
/// for multiple addresses.
#[clap(
long = "ingester-addresses",
env = "INFLUXDB_IOX_INGESTER_ADDRESSES",
required = true
)]
#[cfg(feature = "rpc_write")]
pub ingester_addresses: Vec<String>,
/// Size of the RAM cache used to store catalog metadata information in bytes.
#[clap(
long = "ram-pool-metadata-bytes",
@ -261,6 +288,7 @@ impl QuerierConfig {
/// Return the querier config's ingester addresses. If `--shard-to-ingesters-file` is used to
/// specify a JSON file containing shard to ingester address mappings, this returns `Err` if
/// there are any problems reading, deserializing, or interpreting the file.
#[cfg(not(feature = "rpc_write"))]
pub fn ingester_addresses(&self) -> Result<IngesterAddresses, Error> {
if let Some(file) = &self.shard_to_ingesters_file {
let contents =
@ -283,6 +311,24 @@ impl QuerierConfig {
}
}
/// Return the querier config's ingester addresses.
// When we have switched to using the RPC write path and remove the rpc_write feature, this
// method can be changed to be infallible as clap will handle failure to parse the list of
// strings.
#[cfg(feature = "rpc_write")]
pub fn ingester_addresses(&self) -> Result<IngesterAddresses, Error> {
if self.ingester_addresses.is_empty() {
Ok(IngesterAddresses::None)
} else {
Ok(IngesterAddresses::List(
self.ingester_addresses
.iter()
.map(|s| s.as_str().into())
.collect(),
))
}
}
/// Size of the RAM cache pool for metadata in bytes.
pub fn ram_pool_metadata_bytes(&self) -> usize {
self.ram_pool_metadata_bytes
@ -297,8 +343,21 @@ impl QuerierConfig {
pub fn max_concurrent_queries(&self) -> usize {
self.max_concurrent_queries
}
/// Whether the querier is contacting ingesters that use the RPC write path or not.
#[cfg(feature = "rpc_write")]
pub fn rpc_write(&self) -> bool {
true
}
/// Whether the querier is contacting ingesters that use the RPC write path or not.
#[cfg(not(feature = "rpc_write"))]
pub fn rpc_write(&self) -> bool {
false
}
}
#[cfg(not(feature = "rpc_write"))]
fn deserialize_shard_ingester_map(
contents: &str,
) -> Result<HashMap<ShardIndex, IngesterMapping>, Error> {
@ -375,12 +434,16 @@ pub enum IngesterAddresses {
/// A mapping from shard index to ingesters.
ByShardIndex(HashMap<ShardIndex, IngesterMapping>),
/// A list of ingester2 addresses.
List(Vec<Arc<str>>),
/// No connections, meaning only persisted data should be used.
None,
}
#[derive(Debug, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
#[cfg(not(feature = "rpc_write"))]
struct IngestersConfig {
#[serde(default)]
ignore_all: bool,
@ -392,6 +455,7 @@ struct IngestersConfig {
/// Ingester config.
#[derive(Debug, Deserialize)]
#[cfg(not(feature = "rpc_write"))]
pub struct IngesterConfig {
addr: Option<Arc<str>>,
#[serde(default)]
@ -400,6 +464,7 @@ pub struct IngesterConfig {
/// Shard config.
#[derive(Debug, Deserialize)]
#[cfg(not(feature = "rpc_write"))]
pub struct ShardConfig {
ingester: Option<Arc<str>>,
#[serde(default)]
@ -407,6 +472,7 @@ pub struct ShardConfig {
}
#[cfg(test)]
#[cfg(not(feature = "rpc_write"))] // These tests won't be relevant after the switch to rpc_write.
mod tests {
use super::*;
use clap::Parser;

View File

@ -109,4 +109,4 @@ clippy = []
# Temporary feature to use the RPC write path instead of the write buffer during the transition
# away from using Kafka.
rpc_write = ["ioxd_router/rpc_write"]
rpc_write = ["ioxd_router/rpc_write", "clap_blocks/rpc_write"]

View File

@ -440,9 +440,13 @@ impl Config {
};
let querier_config = QuerierConfig {
num_query_threads: None, // will be ignored
num_query_threads: None, // will be ignored
#[cfg(not(feature = "rpc_write"))]
shard_to_ingesters_file: None, // will be ignored
shard_to_ingesters: None, // will be ignored
#[cfg(not(feature = "rpc_write"))]
shard_to_ingesters: None, // will be ignored
#[cfg(feature = "rpc_write")]
ingester_addresses: vec![], // will be ignored
ram_pool_metadata_bytes: querier_ram_pool_metadata_bytes,
ram_pool_data_bytes: querier_ram_pool_data_bytes,
max_concurrent_queries: querier_max_concurrent_queries,

View File

@ -96,6 +96,12 @@ pub async fn command(config: Config) -> Result<(), Error> {
let num_threads = num_query_threads.unwrap_or_else(num_cpus::get);
info!(%num_threads, "using specified number of threads per thread pool");
if config.querier_config.rpc_write() {
info!("using the RPC write path");
} else {
info!("using the write buffer path");
}
let ingester_addresses = config.querier_config.ingester_addresses()?;
info!(?ingester_addresses, "using ingester addresses");

View File

@ -15,7 +15,7 @@ use ioxd_common::{
use metric::Registry;
use object_store::DynObjectStore;
use querier::{
create_ingester_connections_by_shard, QuerierCatalogCache, QuerierDatabase, QuerierHandler,
create_ingester_connections, QuerierCatalogCache, QuerierDatabase, QuerierHandler,
QuerierHandlerImpl, QuerierServer,
};
use std::{
@ -188,8 +188,15 @@ pub async fn create_querier_server_type(
let ingester_connection = match args.ingester_addresses {
IngesterAddresses::None => None,
IngesterAddresses::ByShardIndex(map) => Some(create_ingester_connections_by_shard(
map,
IngesterAddresses::ByShardIndex(map) => Some(create_ingester_connections(
Some(map),
None,
Arc::clone(&catalog_cache),
args.querier_config.ingester_circuit_breaker_threshold,
)),
IngesterAddresses::List(list) => Some(create_ingester_connections(
None,
Some(list),
Arc::clone(&catalog_cache),
args.querier_config.ingester_circuit_breaker_threshold,
)),
@ -202,6 +209,7 @@ pub async fn create_querier_server_type(
args.exec,
ingester_connection,
args.querier_config.max_concurrent_queries(),
args.querier_config.rpc_write(),
)
.await?,
);

View File

@ -107,6 +107,7 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
false,
)
.await
.unwrap(),
@ -142,6 +143,7 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
false,
)
.await
.unwrap(),

View File

@ -69,7 +69,8 @@ pub struct QuerierDatabase {
query_execution_semaphore: Arc<InstrumentedAsyncSemaphore>,
/// Sharder to determine which ingesters to query for a particular table and namespace.
sharder: Arc<JumpHash<Arc<ShardIndex>>>,
/// Only relevant when using the write buffer; will be None if using RPC write ingesters.
sharder: Option<Arc<JumpHash<Arc<ShardIndex>>>>,
/// Chunk prune metrics.
prune_metrics: Arc<PruneMetrics>,
@ -106,6 +107,7 @@ impl QuerierDatabase {
exec: Arc<Executor>,
ingester_connection: Option<Arc<dyn IngesterConnection>>,
max_concurrent_queries: usize,
rpc_write: bool,
) -> Result<Self, Error> {
assert!(
max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX,
@ -128,9 +130,13 @@ impl QuerierDatabase {
let query_execution_semaphore =
Arc::new(semaphore_metrics.new_semaphore(max_concurrent_queries));
let sharder = Arc::new(
create_sharder(catalog_cache.catalog().as_ref(), backoff_config.clone()).await?,
);
let sharder = if rpc_write {
None
} else {
Some(Arc::new(
create_sharder(catalog_cache.catalog().as_ref(), backoff_config.clone()).await?,
))
};
let prune_metrics = Arc::new(PruneMetrics::new(&metric_registry));
@ -172,7 +178,7 @@ impl QuerierDatabase {
Arc::clone(&self.exec),
self.ingester_connection.clone(),
Arc::clone(&self.query_log),
Arc::clone(&self.sharder),
self.sharder.clone(),
Arc::clone(&self.prune_metrics),
)))
}
@ -256,6 +262,7 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX.saturating_add(1),
false,
)
.await
.unwrap();
@ -280,6 +287,7 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
false,
)
.await,
Error::NoShards
@ -305,6 +313,7 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
false,
)
.await
.unwrap();
@ -334,6 +343,7 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
false,
)
.await
.unwrap();

View File

@ -224,6 +224,7 @@ mod tests {
exec,
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
false,
)
.await
.unwrap(),

View File

@ -139,9 +139,10 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Create a new set of connections given a map of shard indexes to Ingester configurations
pub fn create_ingester_connections_by_shard(
shard_to_ingesters: HashMap<ShardIndex, IngesterMapping>,
/// Create a new set of connections given ingester configurations
pub fn create_ingester_connections(
shard_to_ingesters: Option<HashMap<ShardIndex, IngesterMapping>>,
ingester_addresses: Option<Vec<Arc<str>>>,
catalog_cache: Arc<CatalogCache>,
open_circuit_after_n_errors: u64,
) -> Arc<dyn IngesterConnection> {
@ -161,13 +162,29 @@ pub fn create_ingester_connections_by_shard(
deadline: None,
};
Arc::new(IngesterConnectionImpl::by_shard(
shard_to_ingesters,
catalog_cache,
retry_backoff_config,
circuit_breaker_backoff_config,
open_circuit_after_n_errors,
))
// Exactly one of `shard_to_ingesters` or `ingester_addreses` must be specified.
// `shard_to_ingesters` uses the Kafka write buffer path.
// `ingester_addresses` uses the RPC write path.
match (shard_to_ingesters, ingester_addresses) {
(None, None) => panic!("Neither shard_to_ingesters nor ingester_addresses was specified!"),
(Some(_), Some(_)) => {
panic!("Both shard_to_ingesters and ingester_addresses were specified!")
}
(Some(shard_to_ingesters), None) => Arc::new(IngesterConnectionImpl::by_shard(
shard_to_ingesters,
catalog_cache,
retry_backoff_config,
circuit_breaker_backoff_config,
open_circuit_after_n_errors,
)),
(None, Some(ingester_addresses)) => Arc::new(IngesterConnectionImpl::by_addrs(
ingester_addresses,
catalog_cache,
retry_backoff_config,
circuit_breaker_backoff_config,
open_circuit_after_n_errors,
)),
}
}
/// Create a new ingester suitable for testing
@ -187,7 +204,7 @@ pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static {
#[allow(clippy::too_many_arguments)]
async fn partitions(
&self,
shard_indexes: &[ShardIndex],
shard_indexes: Option<Vec<ShardIndex>>,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
@ -404,6 +421,36 @@ impl IngesterConnectionImpl {
backoff_config,
}
}
/// Create a new set of connections given a list of ingester2 addresses.
pub fn by_addrs(
ingester_addresses: Vec<Arc<str>>,
catalog_cache: Arc<CatalogCache>,
backoff_config: BackoffConfig,
circuit_breaker_backoff_config: BackoffConfig,
open_circuit_after_n_errors: u64,
) -> Self {
let flight_client = Arc::new(FlightClientImpl::new());
let flight_client = Arc::new(CircuitBreakerFlightClient::new(
flight_client,
catalog_cache.time_provider(),
catalog_cache.metric_registry(),
open_circuit_after_n_errors,
circuit_breaker_backoff_config,
));
let metric_registry = catalog_cache.metric_registry();
let metrics = Arc::new(IngesterConnectionMetrics::new(&metric_registry));
Self {
shard_to_ingesters: HashMap::new(),
unique_ingester_addresses: ingester_addresses.into_iter().collect(),
flight_client,
catalog_cache,
metrics,
backoff_config,
}
}
}
/// Struct that names all parameters to `execute`
@ -726,7 +773,7 @@ impl IngesterConnection for IngesterConnectionImpl {
/// Retrieve chunks from the ingester for the particular table, shard, and predicate
async fn partitions(
&self,
shard_indexes: &[ShardIndex],
shard_indexes: Option<Vec<ShardIndex>>,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
@ -734,12 +781,49 @@ impl IngesterConnection for IngesterConnectionImpl {
expected_schema: Arc<Schema>,
span: Option<Span>,
) -> Result<Vec<IngesterPartition>> {
// If no shard indexes are specified, no ingester addresses can be found. This is a
// configuration problem somewhere.
assert!(
!shard_indexes.is_empty(),
"Called `IngesterConnection.partitions` with an empty `shard_indexes` list",
);
let relevant_ingester_addresses = match shard_indexes {
// If shard indexes is None, we're using the RPC write path, and all ingesters should
// be queried.
None => self.unique_ingester_addresses.clone(),
// If shard indexes is Some([]), no ingester addresses can be found. This is a
// configuration problem somewhwere.
Some(shard_indexes) if shard_indexes.is_empty() => {
panic!("Called `IngesterConnection.partitions` with an empty `shard_indexes` list");
}
// Otherwise, we're using the write buffer and need to look up the ingesters to contact
// by their shard index.
Some(shard_indexes) => {
// Look up the ingesters needed for the shard. Collect into a HashSet to avoid making
// multiple requests to the same ingester if that ingester is responsible for multiple
// shard_indexes relevant to this query.
let mut relevant_ingester_addresses = HashSet::new();
for shard_index in &shard_indexes {
match self.shard_to_ingesters.get(shard_index) {
None => {
return NoIngesterFoundForShardSnafu {
shard_index: *shard_index,
}
.fail()
}
Some(mapping) => match mapping {
IngesterMapping::Addr(addr) => {
relevant_ingester_addresses.insert(Arc::clone(addr));
}
IngesterMapping::Ignore => (),
IngesterMapping::NotMapped => {
return ShardNotMappedSnafu {
shard_index: *shard_index,
}
.fail()
}
},
}
}
relevant_ingester_addresses
}
};
let mut span_recorder = SpanRecorder::new(span);
let metrics = Arc::clone(&self.metrics);
@ -797,34 +881,6 @@ impl IngesterConnection for IngesterConnectionImpl {
}
};
// Look up the ingesters needed for the shard. Collect into a HashSet to avoid making
// multiple requests to the same ingester if that ingester is responsible for multiple
// shard_indexes relevant to this query.
let mut relevant_ingester_addresses = HashSet::new();
for shard_index in shard_indexes {
match self.shard_to_ingesters.get(shard_index) {
None => {
return NoIngesterFoundForShardSnafu {
shard_index: *shard_index,
}
.fail()
}
Some(mapping) => match mapping {
IngesterMapping::Addr(addr) => {
relevant_ingester_addresses.insert(Arc::clone(addr));
}
IngesterMapping::Ignore => (),
IngesterMapping::NotMapped => {
return ShardNotMappedSnafu {
shard_index: *shard_index,
}
.fail()
}
},
}
}
let mut ingester_partitions: Vec<IngesterPartition> = relevant_ingester_addresses
.into_iter()
.map(move |ingester_address| measured_ingester_request(ingester_address))
@ -1779,7 +1835,7 @@ mod tests {
let shard_indexes: Vec<_> = shard_indexes.iter().copied().map(ShardIndex::new).collect();
ingester_conn
.partitions(
&shard_indexes,
Some(shard_indexes),
NamespaceId::new(1),
TableId::new(2),
columns,

View File

@ -34,7 +34,7 @@ impl MockIngesterConnection {
impl IngesterConnection for MockIngesterConnection {
async fn partitions(
&self,
_shard_indexes: &[ShardIndex],
_shard_indexes: Option<Vec<ShardIndex>>,
_namespace_id: NamespaceId,
_table_id: TableId,
columns: Vec<String>,

View File

@ -28,7 +28,7 @@ pub use cache::CatalogCache as QuerierCatalogCache;
pub use database::{Error as QuerierDatabaseError, QuerierDatabase};
pub use handler::{QuerierHandler, QuerierHandlerImpl};
pub use ingester::{
create_ingester_connection_for_testing, create_ingester_connections_by_shard,
create_ingester_connection_for_testing, create_ingester_connections,
flight_client::{
Error as IngesterFlightClientError, FlightClient as IngesterFlightClient,
QueryData as IngesterFlightClientQueryData,

View File

@ -58,7 +58,7 @@ impl QuerierNamespace {
exec: Arc<Executor>,
ingester_connection: Option<Arc<dyn IngesterConnection>>,
query_log: Arc<QueryLog>,
sharder: Arc<JumpHash<Arc<ShardIndex>>>,
sharder: Option<Arc<JumpHash<Arc<ShardIndex>>>>,
prune_metrics: Arc<PruneMetrics>,
) -> Self {
let tables: HashMap<_, _> = ns
@ -66,7 +66,7 @@ impl QuerierNamespace {
.iter()
.map(|(table_name, cached_table)| {
let table = Arc::new(QuerierTable::new(QuerierTableArgs {
sharder: Arc::clone(&sharder),
sharder: sharder.clone(),
namespace_id: ns.id,
namespace_name: Arc::clone(&name),
namespace_retention_period: ns.retention_period,
@ -118,7 +118,7 @@ impl QuerierNamespace {
exec,
ingester_connection,
query_log,
sharder,
Some(sharder),
prune_metrics,
)
}

View File

@ -78,7 +78,7 @@ impl From<Error> for DataFusionError {
/// Args to create a [`QuerierTable`].
pub struct QuerierTableArgs {
pub sharder: Arc<JumpHash<Arc<ShardIndex>>>,
pub sharder: Option<Arc<JumpHash<Arc<ShardIndex>>>>,
pub namespace_id: NamespaceId,
pub namespace_name: Arc<str>,
pub namespace_retention_period: Option<Duration>,
@ -94,8 +94,9 @@ pub struct QuerierTableArgs {
/// Table representation for the querier.
#[derive(Debug)]
pub struct QuerierTable {
/// Sharder to query for which shards are responsible for the table's data
sharder: Arc<JumpHash<Arc<ShardIndex>>>,
/// Sharder to query for which shards are responsible for the table's data. If not specified,
/// query all ingesters.
sharder: Option<Arc<JumpHash<Arc<ShardIndex>>>>,
/// Namespace the table is in
namespace_name: Arc<str>,
@ -481,14 +482,15 @@ impl QuerierTable {
// determine which ingester(s) to query.
// Currently, the sharder will only return one shard index per table, but in the
// near future, the sharder might return more than one shard index for one table.
let shard_indexes = vec![**self
let shard_indexes = self
.sharder
.shard_for_query(&self.table_name, &self.namespace_name)];
.as_ref()
.map(|sharder| vec![**sharder.shard_for_query(&self.table_name, &self.namespace_name)]);
// get any chunks from the ingester(s)
let partitions_result = ingester_connection
.partitions(
&shard_indexes,
shard_indexes,
self.namespace_id,
self.table_id,
columns,

View File

@ -39,7 +39,9 @@ pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -
.retention_period_ns
.map(|retention| Duration::from_nanos(retention as u64));
QuerierTable::new(QuerierTableArgs {
sharder: Arc::new(JumpHash::new((0..1).map(ShardIndex::new).map(Arc::new))),
sharder: Some(Arc::new(JumpHash::new(
(0..1).map(ShardIndex::new).map(Arc::new),
))),
namespace_id: table.namespace.namespace.id,
namespace_name,
namespace_retention_period,