From 9166ace7964dde7d0952af21f4231d8f8d2a3f3c Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 5 Dec 2022 12:27:52 -0500 Subject: [PATCH 1/3] feat: Make a mode for the querier to use ingester2 instead, behind the rpc_write feature flag --- Cargo.lock | 1 + clap_blocks/src/lib.rs | 1 + clap_blocks/src/querier_rpc_write.rs | 94 ++++++++ influxdb_iox/src/commands/run/mod.rs | 16 ++ .../src/commands/run/querier_rpc_write.rs | 119 ++++++++++ ioxd_querier/Cargo.toml | 1 + ioxd_querier/src/lib.rs | 99 +++++++- ioxd_querier/src/rpc/namespace.rs | 18 +- ioxd_querier/src/rpc/query.rs | 8 +- ioxd_querier/src/rpc/write_info.rs | 16 +- querier/src/database.rs | 216 ++++++++++++++++-- querier/src/handler.rs | 6 +- querier/src/ingester/mod.rs | 197 +++++++++++++++- querier/src/ingester/test_util.rs | 2 +- querier/src/lib.rs | 7 +- querier/src/namespace/mod.rs | 6 +- querier/src/table/mod.rs | 14 +- 17 files changed, 751 insertions(+), 70 deletions(-) create mode 100644 clap_blocks/src/querier_rpc_write.rs create mode 100644 influxdb_iox/src/commands/run/querier_rpc_write.rs diff --git a/Cargo.lock b/Cargo.lock index 7f8506a014..84b631d974 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2827,6 +2827,7 @@ dependencies = [ "parquet_file", "querier", "router", + "service_common", "service_grpc_flight", "service_grpc_influxrpc", "sharder", diff --git a/clap_blocks/src/lib.rs b/clap_blocks/src/lib.rs index 09052ccad5..02fe530a74 100644 --- a/clap_blocks/src/lib.rs +++ b/clap_blocks/src/lib.rs @@ -18,6 +18,7 @@ pub mod ingester; pub mod ingester2; pub mod object_store; pub mod querier; +pub mod querier_rpc_write; pub mod router; pub mod router_rpc_write; pub mod run_config; diff --git a/clap_blocks/src/querier_rpc_write.rs b/clap_blocks/src/querier_rpc_write.rs new file mode 100644 index 0000000000..c87b88fecd --- /dev/null +++ b/clap_blocks/src/querier_rpc_write.rs @@ -0,0 +1,94 @@ +//! Querier-related config for the RPC write path. + +/// CLI config for the querier using the RPC write path +#[derive(Debug, Clone, PartialEq, Eq, clap::Parser)] +pub struct QuerierRpcWriteConfig { + /// The number of threads to use for queries. + /// + /// If not specified, defaults to the number of cores on the system + #[clap( + long = "num-query-threads", + env = "INFLUXDB_IOX_NUM_QUERY_THREADS", + action + )] + pub num_query_threads: Option, + + /// Size of memory pool used during query exec, in bytes. + /// + /// If queries attempt to allocate more than this many bytes + /// during execution, they will error with "ResourcesExhausted". + #[clap( + long = "exec-mem-pool-bytes", + env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES", + default_value = "8589934592", // 8GB + action + )] + pub exec_mem_pool_bytes: usize, + + /// gRPC address for the router to talk with the ingesters. For + /// example: + /// + /// "http://127.0.0.1:8083" + /// + /// or + /// + /// "http://10.10.10.1:8083,http://10.10.10.2:8083" + /// + /// for multiple addresses. + #[clap( + long = "ingester-addresses", + env = "INFLUXDB_IOX_INGESTER_ADDRESSES", + required = true + )] + pub ingester_addresses: Vec, + + /// Size of the RAM cache used to store catalog metadata information in bytes. + #[clap( + long = "ram-pool-metadata-bytes", + env = "INFLUXDB_IOX_RAM_POOL_METADATA_BYTES", + default_value = "134217728", // 128MB + action + )] + pub ram_pool_metadata_bytes: usize, + + /// Size of the RAM cache used to store data in bytes. + #[clap( + long = "ram-pool-data-bytes", + env = "INFLUXDB_IOX_RAM_POOL_DATA_BYTES", + default_value = "1073741824", // 1GB + action + )] + pub ram_pool_data_bytes: usize, + + /// Limit the number of concurrent queries. + #[clap( + long = "max-concurrent-queries", + env = "INFLUXDB_IOX_MAX_CONCURRENT_QUERIES", + default_value = "10", + action + )] + pub max_concurrent_queries: usize, + + /// After how many ingester query errors should the querier enter circuit breaker mode? + /// + /// The querier normally contacts the ingester for any unpersisted data during query planning. + /// However, when the ingester can not be contacted for some reason, the querier will begin + /// returning results that do not include unpersisted data and enter "circuit breaker mode" + /// to avoid continually retrying the failing connection on subsequent queries. + /// + /// If circuits are open, the querier will NOT contact the ingester and no unpersisted data + /// will be presented to the user. + /// + /// Circuits will switch to "half open" after some jittered timeout and the querier will try to + /// use the ingester in question again. If this succeeds, we are back to normal, otherwise it + /// will back off exponentially before trying again (and again ...). + /// + /// In a production environment the `ingester_circuit_state` metric should be monitored. + #[clap( + long = "ingester-circuit-breaker-threshold", + env = "INFLUXDB_IOX_INGESTER_CIRCUIT_BREAKER_THRESHOLD", + default_value = "10", + action + )] + pub ingester_circuit_breaker_threshold: u64, +} diff --git a/influxdb_iox/src/commands/run/mod.rs b/influxdb_iox/src/commands/run/mod.rs index 0da818e318..0e9b8d032c 100644 --- a/influxdb_iox/src/commands/run/mod.rs +++ b/influxdb_iox/src/commands/run/mod.rs @@ -9,6 +9,8 @@ mod ingester; mod ingester2; mod main; mod querier; +#[cfg(feature = "rpc_write")] +mod querier_rpc_write; mod router; #[cfg(feature = "rpc_write")] mod router_rpc_write; @@ -26,6 +28,10 @@ pub enum Error { #[snafu(display("Error in querier subcommand: {}", source))] QuerierError { source: querier::Error }, + #[cfg(feature = "rpc_write")] + #[snafu(display("Error in querier-rpc-write subcommand: {}", source))] + QuerierRpcWriteError { source: querier_rpc_write::Error }, + #[snafu(display("Error in router subcommand: {}", source))] RouterError { source: router::Error }, @@ -66,6 +72,8 @@ impl Config { Some(Command::Compactor(config)) => config.run_config.logging_config(), Some(Command::GarbageCollector(config)) => config.run_config.logging_config(), Some(Command::Querier(config)) => config.run_config.logging_config(), + #[cfg(feature = "rpc_write")] + Some(Command::QuerierRpcWrite(config)) => config.run_config.logging_config(), Some(Command::Router(config)) => config.run_config.logging_config(), #[cfg(feature = "rpc_write")] Some(Command::RouterRpcWrite(config)) => config.run_config.logging_config(), @@ -86,6 +94,10 @@ enum Command { /// Run the server in querier mode Querier(querier::Config), + /// Run the server in querier mode using the RPC write path. + #[cfg(feature = "rpc_write")] + QuerierRpcWrite(querier_rpc_write::Config), + /// Run the server in router mode Router(router::Config), @@ -122,6 +134,10 @@ pub async fn command(config: Config) -> Result<()> { .await .context(GarbageCollectorSnafu), Some(Command::Querier(config)) => querier::command(config).await.context(QuerierSnafu), + #[cfg(feature = "rpc_write")] + Some(Command::QuerierRpcWrite(config)) => querier_rpc_write::command(config) + .await + .context(QuerierRpcWriteSnafu), Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu), #[cfg(feature = "rpc_write")] Some(Command::RouterRpcWrite(config)) => router_rpc_write::command(config) diff --git a/influxdb_iox/src/commands/run/querier_rpc_write.rs b/influxdb_iox/src/commands/run/querier_rpc_write.rs new file mode 100644 index 0000000000..975ff36443 --- /dev/null +++ b/influxdb_iox/src/commands/run/querier_rpc_write.rs @@ -0,0 +1,119 @@ +//! Command line options for running the querier using the RPC write path. + +use super::main; +use crate::process_info::setup_metric_registry; +use clap_blocks::{ + catalog_dsn::CatalogDsnConfig, object_store::make_object_store, + querier_rpc_write::QuerierRpcWriteConfig, run_config::RunConfig, +}; +use iox_query::exec::Executor; +use iox_time::{SystemProvider, TimeProvider}; +use ioxd_common::{ + server_type::{CommonServerState, CommonServerStateError}, + Service, +}; +use ioxd_querier::{create_querier_grpc_write_server_type, QuerierRpcWriteServerTypeArgs}; +use object_store::DynObjectStore; +use object_store_metrics::ObjectStoreMetrics; +use observability_deps::tracing::*; +use std::sync::Arc; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Run: {0}")] + Run(#[from] main::Error), + + #[error("Invalid config: {0}")] + InvalidConfigCommon(#[from] CommonServerStateError), + + #[error("Catalog error: {0}")] + Catalog(#[from] iox_catalog::interface::Error), + + #[error("Catalog DSN error: {0}")] + CatalogDsn(#[from] clap_blocks::catalog_dsn::Error), + + #[error("Cannot parse object store config: {0}")] + ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError), + + #[error("Querier error: {0}")] + Querier(#[from] ioxd_querier::Error), +} + +#[derive(Debug, clap::Parser)] +#[clap( + name = "run", + about = "Runs in querier mode using the RPC write path", + long_about = "Run the IOx querier server.\n\nThe configuration options below can be \ + set either with the command line flags or with the specified environment \ + variable. If there is a file named '.env' in the current working directory, \ + it is sourced before loading the configuration. + +Configuration is loaded from the following sources (highest precedence first): + - command line arguments + - user set environment variables + - .env file contents + - pre-configured default values" +)] +pub struct Config { + #[clap(flatten)] + pub(crate) run_config: RunConfig, + + #[clap(flatten)] + pub(crate) catalog_dsn: CatalogDsnConfig, + + #[clap(flatten)] + pub(crate) querier_config: QuerierRpcWriteConfig, +} + +pub async fn command(config: Config) -> Result<(), Error> { + let common_state = CommonServerState::from_config(config.run_config.clone())?; + + let time_provider = Arc::new(SystemProvider::new()) as Arc; + let metric_registry = setup_metric_registry(); + + let catalog = config + .catalog_dsn + .get_catalog("querier_rpc_write", Arc::clone(&metric_registry)) + .await?; + + let object_store = make_object_store(config.run_config.object_store_config()) + .map_err(Error::ObjectStoreParsing)?; + // Decorate the object store with a metric recorder. + let object_store: Arc = Arc::new(ObjectStoreMetrics::new( + object_store, + Arc::clone(&time_provider), + &metric_registry, + )); + + let time_provider = Arc::new(SystemProvider::new()); + + let num_threads = config + .querier_config + .num_query_threads + .unwrap_or_else(num_cpus::get); + info!(%num_threads, "using specified number of threads per thread pool"); + + info!(ingester_addresses=?config.querier_config.ingester_addresses, "using ingester addresses"); + + let exec = Arc::new(Executor::new( + num_threads, + config.querier_config.exec_mem_pool_bytes, + )); + + let server_type = create_querier_grpc_write_server_type(QuerierRpcWriteServerTypeArgs { + common_state: &common_state, + metric_registry: Arc::clone(&metric_registry), + catalog, + object_store, + exec, + time_provider, + querier_config: config.querier_config, + }) + .await?; + + info!("starting querier"); + + let services = vec![Service::create(server_type, common_state.run_config())]; + Ok(main::main(common_state, services, metric_registry).await?) +} diff --git a/ioxd_querier/Cargo.toml b/ioxd_querier/Cargo.toml index 17c60fff2a..aa9c298c0c 100644 --- a/ioxd_querier/Cargo.toml +++ b/ioxd_querier/Cargo.toml @@ -17,6 +17,7 @@ object_store = "0.5.2" querier = { path = "../querier" } iox_query = { path = "../iox_query" } router = { path = "../router" } +service_common = { path = "../service_common" } service_grpc_flight = { path = "../service_grpc_flight" } service_grpc_influxrpc = { path = "../service_grpc_influxrpc" } sharder = { path = "../sharder" } diff --git a/ioxd_querier/src/lib.rs b/ioxd_querier/src/lib.rs index c833999293..100fb7663b 100644 --- a/ioxd_querier/src/lib.rs +++ b/ioxd_querier/src/lib.rs @@ -1,5 +1,8 @@ use async_trait::async_trait; -use clap_blocks::querier::{IngesterAddresses, QuerierConfig}; +use clap_blocks::{ + querier::{IngesterAddresses, QuerierConfig}, + querier_rpc_write::QuerierRpcWriteConfig, +}; use hyper::{Body, Request, Response}; use iox_catalog::interface::Catalog; use iox_query::exec::{Executor, ExecutorType}; @@ -15,9 +18,11 @@ use ioxd_common::{ use metric::Registry; use object_store::DynObjectStore; use querier::{ - create_ingester_connections_by_shard, QuerierCatalogCache, QuerierDatabase, QuerierHandler, - QuerierHandlerImpl, QuerierServer, + create_ingester2_connection, create_ingester_connections_by_shard, Database, + QuerierCatalogCache, QuerierDatabase, QuerierHandler, QuerierHandlerImpl, + QuerierRpcWriteDatabase, QuerierServer, }; +use service_common::QueryNamespaceProvider; use std::{ fmt::{Debug, Display}, sync::Arc, @@ -28,22 +33,24 @@ use trace::TraceCollector; mod rpc; -pub struct QuerierServerType { - database: Arc, +pub struct QuerierServerType { + database: Arc, server: QuerierServer, trace_collector: Option>, } -impl std::fmt::Debug for QuerierServerType { +impl std::fmt::Debug + for QuerierServerType +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Querier") } } -impl QuerierServerType { +impl QuerierServerType { pub fn new( server: QuerierServer, - database: Arc, + database: Arc, common_state: &CommonServerState, ) -> Self { Self { @@ -55,7 +62,9 @@ impl QuerierServerType { } #[async_trait] -impl ServerType for QuerierServerType { +impl ServerType + for QuerierServerType +{ /// Return the [`metric::Registry`] used by the compactor. fn metric_registry(&self) -> Arc { self.server.metric_registry() @@ -139,6 +148,18 @@ impl HttpApiErrorSource for IoxHttpError { } } +/// Arguments required to create a [`ServerType`] for the querier using the RPC write path. +#[derive(Debug)] +pub struct QuerierRpcWriteServerTypeArgs<'a> { + pub common_state: &'a CommonServerState, + pub metric_registry: Arc, + pub catalog: Arc, + pub object_store: Arc, + pub exec: Arc, + pub time_provider: Arc, + pub querier_config: QuerierRpcWriteConfig, +} + /// Arguments required to create a [`ServerType`] for the querier. #[derive(Debug)] pub struct QuerierServerTypeArgs<'a> { @@ -158,6 +179,64 @@ pub enum Error { Querier(#[from] querier::QuerierDatabaseError), } +/// Instantiate a querier server that uses the RPC write path +pub async fn create_querier_grpc_write_server_type( + args: QuerierRpcWriteServerTypeArgs<'_>, +) -> Result, Error> { + let catalog_cache = Arc::new(QuerierCatalogCache::new( + Arc::clone(&args.catalog), + args.time_provider, + Arc::clone(&args.metric_registry), + Arc::clone(&args.object_store), + args.querier_config.ram_pool_metadata_bytes, + args.querier_config.ram_pool_data_bytes, + &Handle::current(), + )); + + // register cached object store with the execution context + let parquet_store = catalog_cache.parquet_store(); + let existing = args + .exec + .new_context(ExecutorType::Query) + .inner() + .runtime_env() + .register_object_store( + "iox", + parquet_store.id(), + Arc::clone(parquet_store.object_store()), + ); + assert!(existing.is_none()); + + let ingester_connection = create_ingester2_connection( + &args.querier_config.ingester_addresses, + Arc::clone(&catalog_cache), + args.querier_config.ingester_circuit_breaker_threshold, + ); + + let database = Arc::new( + QuerierRpcWriteDatabase::new( + catalog_cache, + Arc::clone(&args.metric_registry), + args.exec, + ingester_connection, + args.querier_config.max_concurrent_queries, + ) + .await?, + ); + let querier_handler = Arc::new(QuerierHandlerImpl::new( + args.catalog, + Arc::clone(&database) as _, + Arc::clone(&args.object_store), + )); + + let querier = QuerierServer::new(args.metric_registry, querier_handler); + Ok(Arc::new(QuerierServerType::new( + querier, + database, + args.common_state, + ))) +} + /// Instantiate a querier server pub async fn create_querier_server_type( args: QuerierServerTypeArgs<'_>, @@ -207,7 +286,7 @@ pub async fn create_querier_server_type( ); let querier_handler = Arc::new(QuerierHandlerImpl::new( args.catalog, - Arc::clone(&database), + Arc::clone(&database) as _, Arc::clone(&args.object_store), )); diff --git a/ioxd_querier/src/rpc/namespace.rs b/ioxd_querier/src/rpc/namespace.rs index 268627d512..9a0a9803b7 100644 --- a/ioxd_querier/src/rpc/namespace.rs +++ b/ioxd_querier/src/rpc/namespace.rs @@ -7,12 +7,12 @@ use data_types::Namespace; use generated_types::influxdata::iox::namespace::v1 as proto; -use querier::QuerierDatabase; +use querier::Database; use std::sync::Arc; /// Acquire a [`NamespaceService`](proto::namespace_service_server::NamespaceService) gRPC service implementation. -pub fn namespace_service( - server: Arc, +pub fn namespace_service( + server: Arc, ) -> proto::namespace_service_server::NamespaceServiceServer< impl proto::namespace_service_server::NamespaceService, > { @@ -20,12 +20,12 @@ pub fn namespace_service( } #[derive(Debug)] -struct NamespaceServiceImpl { - server: Arc, +struct NamespaceServiceImpl { + server: Arc, } -impl NamespaceServiceImpl { - pub fn new(server: Arc) -> Self { +impl NamespaceServiceImpl { + pub fn new(server: Arc) -> Self { Self { server } } } @@ -40,7 +40,9 @@ fn namespace_to_proto(namespace: Namespace) -> proto::Namespace { } #[tonic::async_trait] -impl proto::namespace_service_server::NamespaceService for NamespaceServiceImpl { +impl proto::namespace_service_server::NamespaceService + for NamespaceServiceImpl +{ async fn get_namespaces( &self, _request: tonic::Request, diff --git a/ioxd_querier/src/rpc/query.rs b/ioxd_querier/src/rpc/query.rs index 9a55b82af3..d578cdc4d3 100644 --- a/ioxd_querier/src/rpc/query.rs +++ b/ioxd_querier/src/rpc/query.rs @@ -4,12 +4,14 @@ use arrow_flight::flight_service_server::{ FlightService as Flight, FlightServiceServer as FlightServer, }; use generated_types::storage_server::{Storage, StorageServer}; -use querier::QuerierDatabase; +use service_common::QueryNamespaceProvider; -pub fn make_flight_server(server: Arc) -> FlightServer { +pub fn make_flight_server(server: Arc) -> FlightServer { service_grpc_flight::make_server(server) } -pub fn make_storage_server(server: Arc) -> StorageServer { +pub fn make_storage_server( + server: Arc, +) -> StorageServer { service_grpc_influxrpc::make_server(server) } diff --git a/ioxd_querier/src/rpc/write_info.rs b/ioxd_querier/src/rpc/write_info.rs index 74a804a3f9..bbd3ff9491 100644 --- a/ioxd_querier/src/rpc/write_info.rs +++ b/ioxd_querier/src/rpc/write_info.rs @@ -4,29 +4,29 @@ use generated_types::influxdata::iox::ingester::v1::{ self as proto, write_info_service_server::{WriteInfoService, WriteInfoServiceServer}, }; -use querier::QuerierDatabase; +use querier::Database; use std::sync::Arc; /// Acquire a [`WriteInfoService`] gRPC service implementation. -pub fn write_info_service( - server: Arc, +pub fn write_info_service( + server: Arc, ) -> WriteInfoServiceServer { WriteInfoServiceServer::new(QuerierWriteInfoServiceImpl::new(server)) } #[derive(Debug)] -struct QuerierWriteInfoServiceImpl { - server: Arc, +struct QuerierWriteInfoServiceImpl { + server: Arc, } -impl QuerierWriteInfoServiceImpl { - pub fn new(server: Arc) -> Self { +impl QuerierWriteInfoServiceImpl { + pub fn new(server: Arc) -> Self { Self { server } } } #[tonic::async_trait] -impl WriteInfoService for QuerierWriteInfoServiceImpl { +impl WriteInfoService for QuerierWriteInfoServiceImpl { async fn get_write_info( &self, request: tonic::Request, diff --git a/querier/src/database.rs b/querier/src/database.rs index 0f234c3ea4..a5a4be72b9 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -12,7 +12,7 @@ use iox_query::exec::Executor; use service_common::QueryNamespaceProvider; use sharder::JumpHash; use snafu::Snafu; -use std::{collections::BTreeSet, sync::Arc}; +use std::{collections::BTreeSet, fmt::Debug, sync::Arc}; use trace::span::{Span, SpanRecorder}; use tracker::{ AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore, @@ -34,6 +34,19 @@ pub enum Error { NoShards, } +/// Shared Querier Database behavior whether using the RPC write path or not +#[async_trait] +pub trait Database: Debug + Send + Sync + 'static { + /// Executor access + fn exec(&self) -> &Executor; + + /// Return all namespaces this querier knows about + async fn namespaces(&self) -> Vec; + + /// Return connection to ingester(s) to get and aggregate information from them + fn ingester_connection(&self) -> Option>; +} + /// Database for the querier. /// /// Contains all namespaces. @@ -91,6 +104,27 @@ impl QueryNamespaceProvider for QuerierDatabase { } } +#[async_trait] +impl Database for QuerierDatabase { + fn exec(&self) -> &Executor { + &self.exec + } + + async fn namespaces(&self) -> Vec { + let catalog = &self.catalog_cache.catalog(); + Backoff::new(&self.backoff_config) + .retry_all_errors("listing namespaces", || async { + catalog.repositories().await.namespaces().list().await + }) + .await + .expect("retry forever") + } + + fn ingester_connection(&self) -> Option> { + self.ingester_connection.clone() + } +} + impl QuerierDatabase { /// The maximum value for `max_concurrent_queries` that is allowed. /// @@ -172,31 +206,10 @@ impl QuerierDatabase { Arc::clone(&self.exec), self.ingester_connection.clone(), Arc::clone(&self.query_log), - Arc::clone(&self.sharder), + Some(Arc::clone(&self.sharder)), Arc::clone(&self.prune_metrics), ))) } - - /// Return all namespaces this querier knows about - pub async fn namespaces(&self) -> Vec { - let catalog = &self.catalog_cache.catalog(); - Backoff::new(&self.backoff_config) - .retry_all_errors("listing namespaces", || async { - catalog.repositories().await.namespaces().list().await - }) - .await - .expect("retry forever") - } - - /// Return connection to ingester(s) to get and aggregate information from them - pub fn ingester_connection(&self) -> Option> { - self.ingester_connection.clone() - } - - /// Executor - pub(crate) fn exec(&self) -> &Executor { - &self.exec - } } pub async fn create_sharder( @@ -228,6 +241,163 @@ pub async fn create_sharder( Ok(JumpHash::new(shard_indexes.into_iter().map(Arc::new))) } +/// Database for the querier using the RPC write path. +/// +/// Contains all namespaces. +#[derive(Debug)] +pub struct QuerierRpcWriteDatabase { + /// Backoff config for IO operations. + backoff_config: BackoffConfig, + + /// Catalog cache. + catalog_cache: Arc, + + /// Adapter to create chunks. + chunk_adapter: Arc, + + /// Metric registry + #[allow(dead_code)] + metric_registry: Arc, + + /// Executor for queries. + exec: Arc, + + /// Connection to ingester(s) + ingester_connection: Option>, + + /// Query log. + query_log: Arc, + + /// Semaphore that limits the number of namespaces in used at the time by the query subsystem. + /// + /// This should be a 1-to-1 relation to the number of active queries. + /// + /// If the same namespace is requested twice for different queries, it is counted twice. + query_execution_semaphore: Arc, + + /// Chunk prune metrics. + prune_metrics: Arc, +} + +impl QuerierRpcWriteDatabase { + /// The maximum value for `max_concurrent_queries` that is allowed. + /// + /// This limit exists because [`tokio::sync::Semaphore`] has an internal limit and semaphore + /// creation beyond that will panic. The tokio limit is not exposed though so we pick a + /// reasonable but smaller number. + pub const MAX_CONCURRENT_QUERIES_MAX: usize = u16::MAX as usize; + + /// Create new database. + pub async fn new( + catalog_cache: Arc, + metric_registry: Arc, + exec: Arc, + ingester_connection: Option>, + max_concurrent_queries: usize, + ) -> Result { + assert!( + max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX, + "`max_concurrent_queries` ({}) > `max_concurrent_queries_MAX` ({})", + max_concurrent_queries, + Self::MAX_CONCURRENT_QUERIES_MAX, + ); + + let backoff_config = BackoffConfig::default(); + + let chunk_adapter = Arc::new(ChunkAdapter::new( + Arc::clone(&catalog_cache), + Arc::clone(&metric_registry), + )); + let query_log = Arc::new(QueryLog::new(QUERY_LOG_SIZE, catalog_cache.time_provider())); + let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new( + &metric_registry, + &[("semaphore", "query_execution")], + )); + let query_execution_semaphore = + Arc::new(semaphore_metrics.new_semaphore(max_concurrent_queries)); + + let prune_metrics = Arc::new(PruneMetrics::new(&metric_registry)); + + Ok(Self { + backoff_config, + catalog_cache, + chunk_adapter, + metric_registry, + exec, + ingester_connection, + query_log, + query_execution_semaphore, + prune_metrics, + }) + } + + /// Get namespace if it exists. + /// + /// This will await the internal namespace semaphore. Existence of namespaces is checked AFTER + /// a semaphore permit was acquired since this lowers the chance that we obtain stale data. + pub async fn namespace(&self, name: &str, span: Option) -> Option> { + let span_recorder = SpanRecorder::new(span); + let name = Arc::from(name.to_owned()); + let ns = self + .catalog_cache + .namespace() + .get( + Arc::clone(&name), + // we have no specific need for any tables or columns at this point, so nothing to cover + &[], + span_recorder.child_span("cache GET namespace schema"), + ) + .await?; + Some(Arc::new(QuerierNamespace::new( + Arc::clone(&self.chunk_adapter), + ns, + name, + Arc::clone(&self.exec), + self.ingester_connection.clone(), + Arc::clone(&self.query_log), + None, + Arc::clone(&self.prune_metrics), + ))) + } +} + +#[async_trait] +impl Database for QuerierRpcWriteDatabase { + fn exec(&self) -> &Executor { + &self.exec + } + + async fn namespaces(&self) -> Vec { + let catalog = &self.catalog_cache.catalog(); + Backoff::new(&self.backoff_config) + .retry_all_errors("listing namespaces", || async { + catalog.repositories().await.namespaces().list().await + }) + .await + .expect("retry forever") + } + + fn ingester_connection(&self) -> Option> { + self.ingester_connection.clone() + } +} + +#[async_trait] +impl QueryNamespaceProvider for QuerierRpcWriteDatabase { + type Db = QuerierNamespace; + + async fn db(&self, name: &str, span: Option) -> Option> { + self.namespace(name, span).await + } + + async fn acquire_semaphore(&self, span: Option) -> InstrumentedAsyncOwnedSemaphorePermit { + Arc::clone(&self.query_execution_semaphore) + .acquire_owned(span) + .await + .expect("Semaphore should not be closed by anyone") + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/querier/src/handler.rs b/querier/src/handler.rs index d22d9f64a2..c9efde1d12 100644 --- a/querier/src/handler.rs +++ b/querier/src/handler.rs @@ -22,7 +22,7 @@ use thiserror::Error; use tokio::task::{JoinError, JoinHandle}; use tokio_util::sync::CancellationToken; -use crate::{database::QuerierDatabase, poison::PoisonCabinet}; +use crate::{database::Database, poison::PoisonCabinet}; #[derive(Debug, Error)] #[allow(missing_copy_implementations, missing_docs)] @@ -65,7 +65,7 @@ pub struct QuerierHandlerImpl { catalog: Arc, /// Database that handles query operation - database: Arc, + database: Arc, /// The object store object_store: Arc, @@ -85,7 +85,7 @@ impl QuerierHandlerImpl { /// Initialize the Querier pub fn new( catalog: Arc, - database: Arc, + database: Arc, object_store: Arc, ) -> Self { let shutdown = CancellationToken::new(); diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index a2f8efdd34..0640bebfd8 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -170,6 +170,42 @@ pub fn create_ingester_connections_by_shard( )) } +/// Create a new set of connections to ingester2 instances +pub fn create_ingester2_connection( + ingester_addrs: &[String], + catalog_cache: Arc, + open_circuit_after_n_errors: u64, +) -> Option> { + if ingester_addrs.is_empty() { + return None; + } + + // This backoff config is used to retry requests for a specific table-scoped query. + let retry_backoff_config = BackoffConfig { + init_backoff: Duration::from_millis(100), + max_backoff: Duration::from_secs(1), + base: 3.0, + deadline: Some(Duration::from_secs(10)), + }; + + // This backoff config is used to half-open the circuit after it was opened. Circuits are + // ingester-scoped. + let circuit_breaker_backoff_config = BackoffConfig { + init_backoff: Duration::from_secs(1), + max_backoff: Duration::from_secs(60), + base: 3.0, + deadline: None, + }; + + Some(Arc::new(IngesterRpcWriteConnection::new( + ingester_addrs, + catalog_cache, + retry_backoff_config, + circuit_breaker_backoff_config, + open_circuit_after_n_errors, + ))) +} + /// Create a new ingester suitable for testing pub fn create_ingester_connection_for_testing() -> Arc { Arc::new(MockIngesterConnection::new()) @@ -187,7 +223,7 @@ pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static { #[allow(clippy::too_many_arguments)] async fn partitions( &self, - shard_indexes: &[ShardIndex], + shard_indexes: Option>, namespace_id: NamespaceId, table_id: TableId, columns: Vec, @@ -406,6 +442,160 @@ impl IngesterConnectionImpl { } } +/// Ingester connection that communicates with ingester2 instances that support the RPC write path. +#[derive(Debug)] +pub struct IngesterRpcWriteConnection { + ingester_addresses: Vec>, + flight_client: Arc, + catalog_cache: Arc, + metrics: Arc, + backoff_config: BackoffConfig, +} + +impl IngesterRpcWriteConnection { + fn new( + ingester_addrs: &[String], + catalog_cache: Arc, + backoff_config: BackoffConfig, + circuit_breaker_backoff_config: BackoffConfig, + open_circuit_after_n_errors: u64, + ) -> Self { + let flight_client = Arc::new(FlightClientImpl::new()); + let flight_client = Arc::new(CircuitBreakerFlightClient::new( + flight_client, + catalog_cache.time_provider(), + catalog_cache.metric_registry(), + open_circuit_after_n_errors, + circuit_breaker_backoff_config, + )); + + let metric_registry = catalog_cache.metric_registry(); + let metrics = Arc::new(IngesterConnectionMetrics::new(&metric_registry)); + + Self { + ingester_addresses: ingester_addrs.iter().map(|s| s.as_str().into()).collect(), + flight_client, + catalog_cache, + metrics, + backoff_config, + } + } +} + +#[async_trait] +impl IngesterConnection for IngesterRpcWriteConnection { + /// Retrieve chunks from the ingester for the particular table, shard, and predicate + async fn partitions( + &self, + // Shard indexes aren't relevant for the RPC write path. This should always be `None`, but + // regardless, this implementation ignores the value. When the RPC write path is the only + // implementation, this parameter can be removed. + _shard_indexes: Option>, + namespace_id: NamespaceId, + table_id: TableId, + columns: Vec, + predicate: &Predicate, + expected_schema: Arc, + span: Option, + ) -> Result> { + let mut span_recorder = SpanRecorder::new(span); + + let metrics = Arc::clone(&self.metrics); + + let measured_ingester_request = |ingester_address: Arc| { + let metrics = Arc::clone(&metrics); + let request = GetPartitionForIngester { + flight_client: Arc::clone(&self.flight_client), + catalog_cache: Arc::clone(&self.catalog_cache), + ingester_address: Arc::clone(&ingester_address), + namespace_id, + table_id, + columns: columns.clone(), + predicate, + expected_schema: Arc::clone(&expected_schema), + }; + + let backoff_config = self.backoff_config.clone(); + + // wrap `execute` into an additional future so that we can measure the request time + // INFO: create the measurement structure outside of the async block so cancellation is + // always measured + let measure_me = ObserveIngesterRequest::new(request.clone(), metrics, &span_recorder); + async move { + let span_recorder = measure_me + .span_recorder() + .child("ingester request (retry block)"); + + let res = Backoff::new(&backoff_config) + .retry_all_errors("ingester request", move || { + let request = request.clone(); + let span_recorder = span_recorder.child("ingester request (single try)"); + + async move { execute(request, &span_recorder).await } + }) + .await; + + match &res { + Ok(partitions) => { + let mut status = IngesterResponseOk::default(); + for p in partitions { + status.n_partitions += 1; + for c in p.chunks() { + status.n_chunks += 1; + status.n_rows += c.rows(); + } + } + + measure_me.set_ok(status); + } + Err(_) => measure_me.set_err(), + } + + res + } + }; + + let mut ingester_partitions: Vec = self + .ingester_addresses + .iter() + .cloned() + .map(move |ingester_address| measured_ingester_request(ingester_address)) + .collect::>() + .try_collect::>() + .await + .map_err(|e| { + span_recorder.error("failed"); + match e { + BackoffError::DeadlineExceeded { source, .. } => source, + } + })? + // We have a Vec> flatten to Vec<_> + .into_iter() + .flatten() + .collect(); + + ingester_partitions.sort_by_key(|p| p.partition_id); + span_recorder.ok("done"); + Ok(ingester_partitions) + } + + async fn get_write_info(&self, write_token: &str) -> Result { + let responses = self + .ingester_addresses + .iter() + .map(|ingester_address| execute_get_write_infos(ingester_address, write_token)) + .collect::>() + .try_collect::>() + .await?; + + Ok(merge_responses(responses)) + } + + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } +} + /// Struct that names all parameters to `execute` #[derive(Debug, Clone)] struct GetPartitionForIngester<'a> { @@ -726,7 +916,7 @@ impl IngesterConnection for IngesterConnectionImpl { /// Retrieve chunks from the ingester for the particular table, shard, and predicate async fn partitions( &self, - shard_indexes: &[ShardIndex], + shard_indexes: Option>, namespace_id: NamespaceId, table_id: TableId, columns: Vec, @@ -736,6 +926,7 @@ impl IngesterConnection for IngesterConnectionImpl { ) -> Result> { // If no shard indexes are specified, no ingester addresses can be found. This is a // configuration problem somewhere. + let shard_indexes = shard_indexes.expect("A `shard_indexes` list must be specified"); assert!( !shard_indexes.is_empty(), "Called `IngesterConnection.partitions` with an empty `shard_indexes` list", @@ -802,7 +993,7 @@ impl IngesterConnection for IngesterConnectionImpl { // shard_indexes relevant to this query. let mut relevant_ingester_addresses = HashSet::new(); - for shard_index in shard_indexes { + for shard_index in &shard_indexes { match self.shard_to_ingesters.get(shard_index) { None => { return NoIngesterFoundForShardSnafu { diff --git a/querier/src/ingester/test_util.rs b/querier/src/ingester/test_util.rs index 5eeacaa2a1..1213b72487 100644 --- a/querier/src/ingester/test_util.rs +++ b/querier/src/ingester/test_util.rs @@ -34,7 +34,7 @@ impl MockIngesterConnection { impl IngesterConnection for MockIngesterConnection { async fn partitions( &self, - _shard_indexes: &[ShardIndex], + _shard_indexes: Option>, _namespace_id: NamespaceId, _table_id: TableId, columns: Vec, diff --git a/querier/src/lib.rs b/querier/src/lib.rs index 02bd1ae999..44304065a0 100644 --- a/querier/src/lib.rs +++ b/querier/src/lib.rs @@ -25,10 +25,13 @@ mod table; mod tombstone; pub use cache::CatalogCache as QuerierCatalogCache; -pub use database::{Error as QuerierDatabaseError, QuerierDatabase}; +pub use database::{ + Database, Error as QuerierDatabaseError, QuerierDatabase, QuerierRpcWriteDatabase, +}; pub use handler::{QuerierHandler, QuerierHandlerImpl}; pub use ingester::{ - create_ingester_connection_for_testing, create_ingester_connections_by_shard, + create_ingester2_connection, create_ingester_connection_for_testing, + create_ingester_connections_by_shard, flight_client::{ Error as IngesterFlightClientError, FlightClient as IngesterFlightClient, QueryData as IngesterFlightClientQueryData, diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index bb308a8cdc..9947d8f6a1 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -58,7 +58,7 @@ impl QuerierNamespace { exec: Arc, ingester_connection: Option>, query_log: Arc, - sharder: Arc>>, + sharder: Option>>>, prune_metrics: Arc, ) -> Self { let tables: HashMap<_, _> = ns @@ -66,7 +66,7 @@ impl QuerierNamespace { .iter() .map(|(table_name, cached_table)| { let table = Arc::new(QuerierTable::new(QuerierTableArgs { - sharder: Arc::clone(&sharder), + sharder: sharder.clone(), namespace_id: ns.id, namespace_name: Arc::clone(&name), namespace_retention_period: ns.retention_period, @@ -118,7 +118,7 @@ impl QuerierNamespace { exec, ingester_connection, query_log, - sharder, + Some(sharder), prune_metrics, ) } diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 5f86bd3433..6eb9cfa9b4 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -78,7 +78,7 @@ impl From for DataFusionError { /// Args to create a [`QuerierTable`]. pub struct QuerierTableArgs { - pub sharder: Arc>>, + pub sharder: Option>>>, pub namespace_id: NamespaceId, pub namespace_name: Arc, pub namespace_retention_period: Option, @@ -94,8 +94,9 @@ pub struct QuerierTableArgs { /// Table representation for the querier. #[derive(Debug)] pub struct QuerierTable { - /// Sharder to query for which shards are responsible for the table's data - sharder: Arc>>, + /// Sharder to query for which shards are responsible for the table's data. If not specified, + /// query all ingesters. + sharder: Option>>>, /// Namespace the table is in namespace_name: Arc, @@ -474,14 +475,15 @@ impl QuerierTable { // determine which ingester(s) to query. // Currently, the sharder will only return one shard index per table, but in the // near future, the sharder might return more than one shard index for one table. - let shard_indexes = vec![**self + let shard_indexes = self .sharder - .shard_for_query(&self.table_name, &self.namespace_name)]; + .as_ref() + .map(|sharder| vec![**sharder.shard_for_query(&self.table_name, &self.namespace_name)]); // get any chunks from the ingester(s) let partitions_result = ingester_connection .partitions( - &shard_indexes, + shard_indexes, self.namespace_id, self.table_id, columns, From b1c5ec4dee295a49072a7476fc2837913b2be08c Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 7 Dec 2022 10:06:48 -0500 Subject: [PATCH 2/3] fix: Correct compiler errors in places I missed while running crate tests --- ioxd_querier/src/rpc/namespace.rs | 6 ++++-- querier/src/handler.rs | 2 +- querier/src/ingester/mod.rs | 2 +- querier/src/table/test_util.rs | 4 +++- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/ioxd_querier/src/rpc/namespace.rs b/ioxd_querier/src/rpc/namespace.rs index 9a0a9803b7..02d21f1be4 100644 --- a/ioxd_querier/src/rpc/namespace.rs +++ b/ioxd_querier/src/rpc/namespace.rs @@ -82,7 +82,7 @@ mod tests { use super::*; use generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService; use iox_tests::util::TestCatalog; - use querier::{create_ingester_connection_for_testing, QuerierCatalogCache}; + use querier::{create_ingester_connection_for_testing, QuerierCatalogCache, QuerierDatabase}; use tokio::runtime::Handle; /// Common retention period value we'll use in tests @@ -173,7 +173,9 @@ mod tests { ); } - async fn get_namespaces(service: &NamespaceServiceImpl) -> proto::GetNamespacesResponse { + async fn get_namespaces( + service: &NamespaceServiceImpl, + ) -> proto::GetNamespacesResponse { let request = proto::GetNamespacesRequest {}; let mut namespaces = service diff --git a/querier/src/handler.rs b/querier/src/handler.rs index c9efde1d12..602355e844 100644 --- a/querier/src/handler.rs +++ b/querier/src/handler.rs @@ -159,7 +159,7 @@ impl Drop for QuerierHandlerImpl { #[cfg(test)] mod tests { use super::*; - use crate::{cache::CatalogCache, create_ingester_connection_for_testing}; + use crate::{cache::CatalogCache, create_ingester_connection_for_testing, QuerierDatabase}; use data_types::ShardIndex; use iox_catalog::mem::MemCatalog; use iox_query::exec::Executor; diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 0640bebfd8..7634d453f6 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -1970,7 +1970,7 @@ mod tests { let shard_indexes: Vec<_> = shard_indexes.iter().copied().map(ShardIndex::new).collect(); ingester_conn .partitions( - &shard_indexes, + Some(shard_indexes), NamespaceId::new(1), TableId::new(2), columns, diff --git a/querier/src/table/test_util.rs b/querier/src/table/test_util.rs index 4db57d78b3..c6532741c1 100644 --- a/querier/src/table/test_util.rs +++ b/querier/src/table/test_util.rs @@ -39,7 +39,9 @@ pub async fn querier_table(catalog: &Arc, table: &Arc) - .retention_period_ns .map(|retention| Duration::from_nanos(retention as u64)); QuerierTable::new(QuerierTableArgs { - sharder: Arc::new(JumpHash::new((0..1).map(ShardIndex::new).map(Arc::new))), + sharder: Some(Arc::new(JumpHash::new( + (0..1).map(ShardIndex::new).map(Arc::new), + ))), namespace_id: table.namespace.namespace.id, namespace_name, namespace_retention_period, From e13e668d2664976fef01321b89916b20a18da385 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 7 Dec 2022 10:53:52 -0500 Subject: [PATCH 3/3] refactor: Share more code in the querier in the RPC write path mode --- Cargo.lock | 1 - clap_blocks/Cargo.toml | 4 + clap_blocks/src/lib.rs | 1 - clap_blocks/src/querier.rs | 70 ++++- clap_blocks/src/querier_rpc_write.rs | 94 ------ influxdb_iox/Cargo.toml | 2 +- influxdb_iox/src/commands/run/all_in_one.rs | 8 +- influxdb_iox/src/commands/run/mod.rs | 16 - influxdb_iox/src/commands/run/querier.rs | 6 + .../src/commands/run/querier_rpc_write.rs | 119 -------- ioxd_querier/Cargo.toml | 1 - ioxd_querier/src/lib.rs | 111 ++----- ioxd_querier/src/rpc/namespace.rs | 26 +- ioxd_querier/src/rpc/query.rs | 8 +- ioxd_querier/src/rpc/write_info.rs | 16 +- querier/src/database.rs | 234 +++------------ querier/src/handler.rs | 9 +- querier/src/ingester/mod.rs | 283 +++++------------- querier/src/lib.rs | 7 +- 19 files changed, 246 insertions(+), 770 deletions(-) delete mode 100644 clap_blocks/src/querier_rpc_write.rs delete mode 100644 influxdb_iox/src/commands/run/querier_rpc_write.rs diff --git a/Cargo.lock b/Cargo.lock index 84b631d974..7f8506a014 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2827,7 +2827,6 @@ dependencies = [ "parquet_file", "querier", "router", - "service_common", "service_grpc_flight", "service_grpc_influxrpc", "sharder", diff --git a/clap_blocks/Cargo.toml b/clap_blocks/Cargo.toml index 0f97e01cdb..7fa3508599 100644 --- a/clap_blocks/Cargo.toml +++ b/clap_blocks/Cargo.toml @@ -33,3 +33,7 @@ test_helpers = { path = "../test_helpers" } azure = ["object_store/azure"] # Optional Azure Object store support gcp = ["object_store/gcp"] # Optional GCP object store support aws = ["object_store/aws"] # Optional AWS / S3 object store support + +# Temporary feature to use the RPC write path instead of the write buffer during the transition +# away from using Kafka. +rpc_write = [] diff --git a/clap_blocks/src/lib.rs b/clap_blocks/src/lib.rs index 02fe530a74..09052ccad5 100644 --- a/clap_blocks/src/lib.rs +++ b/clap_blocks/src/lib.rs @@ -18,7 +18,6 @@ pub mod ingester; pub mod ingester2; pub mod object_store; pub mod querier; -pub mod querier_rpc_write; pub mod router; pub mod router_rpc_write; pub mod run_config; diff --git a/clap_blocks/src/querier.rs b/clap_blocks/src/querier.rs index 4b848f708f..2d5ec0c777 100644 --- a/clap_blocks/src/querier.rs +++ b/clap_blocks/src/querier.rs @@ -1,8 +1,14 @@ //! Querier-related configs. use data_types::{IngesterMapping, ShardIndex}; +use snafu::Snafu; +use std::{collections::HashMap, io, path::PathBuf, sync::Arc}; + +#[cfg(not(feature = "rpc_write"))] use serde::Deserialize; -use snafu::{ResultExt, Snafu}; -use std::{collections::HashMap, fs, io, path::PathBuf, sync::Arc}; +#[cfg(not(feature = "rpc_write"))] +use snafu::ResultExt; +#[cfg(not(feature = "rpc_write"))] +use std::fs; #[derive(Debug, Snafu)] #[allow(missing_docs)] @@ -19,6 +25,7 @@ pub enum Error { ingesters, shards, ))] + #[cfg(not(feature = "rpc_write"))] IgnoreAllRequiresEmptyConfig { ingesters: HashMap, Arc>, shards: HashMap, @@ -130,6 +137,7 @@ pub struct QuerierConfig { env = "INFLUXDB_IOX_SHARD_TO_INGESTERS_FILE", action )] + #[cfg(not(feature = "rpc_write"))] pub shard_to_ingesters_file: Option, /// JSON containing a Shard index to ingesters gRPC mapping. For example: @@ -199,8 +207,27 @@ pub struct QuerierConfig { env = "INFLUXDB_IOX_SHARD_TO_INGESTERS", action )] + #[cfg(not(feature = "rpc_write"))] pub shard_to_ingesters: Option, + /// gRPC address for the router to talk with the ingesters. For + /// example: + /// + /// "http://127.0.0.1:8083" + /// + /// or + /// + /// "http://10.10.10.1:8083,http://10.10.10.2:8083" + /// + /// for multiple addresses. + #[clap( + long = "ingester-addresses", + env = "INFLUXDB_IOX_INGESTER_ADDRESSES", + required = true + )] + #[cfg(feature = "rpc_write")] + pub ingester_addresses: Vec, + /// Size of the RAM cache used to store catalog metadata information in bytes. #[clap( long = "ram-pool-metadata-bytes", @@ -261,6 +288,7 @@ impl QuerierConfig { /// Return the querier config's ingester addresses. If `--shard-to-ingesters-file` is used to /// specify a JSON file containing shard to ingester address mappings, this returns `Err` if /// there are any problems reading, deserializing, or interpreting the file. + #[cfg(not(feature = "rpc_write"))] pub fn ingester_addresses(&self) -> Result { if let Some(file) = &self.shard_to_ingesters_file { let contents = @@ -283,6 +311,24 @@ impl QuerierConfig { } } + /// Return the querier config's ingester addresses. + // When we have switched to using the RPC write path and remove the rpc_write feature, this + // method can be changed to be infallible as clap will handle failure to parse the list of + // strings. + #[cfg(feature = "rpc_write")] + pub fn ingester_addresses(&self) -> Result { + if self.ingester_addresses.is_empty() { + Ok(IngesterAddresses::None) + } else { + Ok(IngesterAddresses::List( + self.ingester_addresses + .iter() + .map(|s| s.as_str().into()) + .collect(), + )) + } + } + /// Size of the RAM cache pool for metadata in bytes. pub fn ram_pool_metadata_bytes(&self) -> usize { self.ram_pool_metadata_bytes @@ -297,8 +343,21 @@ impl QuerierConfig { pub fn max_concurrent_queries(&self) -> usize { self.max_concurrent_queries } + + /// Whether the querier is contacting ingesters that use the RPC write path or not. + #[cfg(feature = "rpc_write")] + pub fn rpc_write(&self) -> bool { + true + } + + /// Whether the querier is contacting ingesters that use the RPC write path or not. + #[cfg(not(feature = "rpc_write"))] + pub fn rpc_write(&self) -> bool { + false + } } +#[cfg(not(feature = "rpc_write"))] fn deserialize_shard_ingester_map( contents: &str, ) -> Result, Error> { @@ -375,12 +434,16 @@ pub enum IngesterAddresses { /// A mapping from shard index to ingesters. ByShardIndex(HashMap), + /// A list of ingester2 addresses. + List(Vec>), + /// No connections, meaning only persisted data should be used. None, } #[derive(Debug, Deserialize, Default)] #[serde(rename_all = "camelCase")] +#[cfg(not(feature = "rpc_write"))] struct IngestersConfig { #[serde(default)] ignore_all: bool, @@ -392,6 +455,7 @@ struct IngestersConfig { /// Ingester config. #[derive(Debug, Deserialize)] +#[cfg(not(feature = "rpc_write"))] pub struct IngesterConfig { addr: Option>, #[serde(default)] @@ -400,6 +464,7 @@ pub struct IngesterConfig { /// Shard config. #[derive(Debug, Deserialize)] +#[cfg(not(feature = "rpc_write"))] pub struct ShardConfig { ingester: Option>, #[serde(default)] @@ -407,6 +472,7 @@ pub struct ShardConfig { } #[cfg(test)] +#[cfg(not(feature = "rpc_write"))] // These tests won't be relevant after the switch to rpc_write. mod tests { use super::*; use clap::Parser; diff --git a/clap_blocks/src/querier_rpc_write.rs b/clap_blocks/src/querier_rpc_write.rs deleted file mode 100644 index c87b88fecd..0000000000 --- a/clap_blocks/src/querier_rpc_write.rs +++ /dev/null @@ -1,94 +0,0 @@ -//! Querier-related config for the RPC write path. - -/// CLI config for the querier using the RPC write path -#[derive(Debug, Clone, PartialEq, Eq, clap::Parser)] -pub struct QuerierRpcWriteConfig { - /// The number of threads to use for queries. - /// - /// If not specified, defaults to the number of cores on the system - #[clap( - long = "num-query-threads", - env = "INFLUXDB_IOX_NUM_QUERY_THREADS", - action - )] - pub num_query_threads: Option, - - /// Size of memory pool used during query exec, in bytes. - /// - /// If queries attempt to allocate more than this many bytes - /// during execution, they will error with "ResourcesExhausted". - #[clap( - long = "exec-mem-pool-bytes", - env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES", - default_value = "8589934592", // 8GB - action - )] - pub exec_mem_pool_bytes: usize, - - /// gRPC address for the router to talk with the ingesters. For - /// example: - /// - /// "http://127.0.0.1:8083" - /// - /// or - /// - /// "http://10.10.10.1:8083,http://10.10.10.2:8083" - /// - /// for multiple addresses. - #[clap( - long = "ingester-addresses", - env = "INFLUXDB_IOX_INGESTER_ADDRESSES", - required = true - )] - pub ingester_addresses: Vec, - - /// Size of the RAM cache used to store catalog metadata information in bytes. - #[clap( - long = "ram-pool-metadata-bytes", - env = "INFLUXDB_IOX_RAM_POOL_METADATA_BYTES", - default_value = "134217728", // 128MB - action - )] - pub ram_pool_metadata_bytes: usize, - - /// Size of the RAM cache used to store data in bytes. - #[clap( - long = "ram-pool-data-bytes", - env = "INFLUXDB_IOX_RAM_POOL_DATA_BYTES", - default_value = "1073741824", // 1GB - action - )] - pub ram_pool_data_bytes: usize, - - /// Limit the number of concurrent queries. - #[clap( - long = "max-concurrent-queries", - env = "INFLUXDB_IOX_MAX_CONCURRENT_QUERIES", - default_value = "10", - action - )] - pub max_concurrent_queries: usize, - - /// After how many ingester query errors should the querier enter circuit breaker mode? - /// - /// The querier normally contacts the ingester for any unpersisted data during query planning. - /// However, when the ingester can not be contacted for some reason, the querier will begin - /// returning results that do not include unpersisted data and enter "circuit breaker mode" - /// to avoid continually retrying the failing connection on subsequent queries. - /// - /// If circuits are open, the querier will NOT contact the ingester and no unpersisted data - /// will be presented to the user. - /// - /// Circuits will switch to "half open" after some jittered timeout and the querier will try to - /// use the ingester in question again. If this succeeds, we are back to normal, otherwise it - /// will back off exponentially before trying again (and again ...). - /// - /// In a production environment the `ingester_circuit_state` metric should be monitored. - #[clap( - long = "ingester-circuit-breaker-threshold", - env = "INFLUXDB_IOX_INGESTER_CIRCUIT_BREAKER_THRESHOLD", - default_value = "10", - action - )] - pub ingester_circuit_breaker_threshold: u64, -} diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index ece35afb6e..fcb62da14a 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -109,4 +109,4 @@ clippy = [] # Temporary feature to use the RPC write path instead of the write buffer during the transition # away from using Kafka. -rpc_write = ["ioxd_router/rpc_write"] +rpc_write = ["ioxd_router/rpc_write", "clap_blocks/rpc_write"] diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index a4ffc74a42..8e0248a1b3 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -439,9 +439,13 @@ impl Config { }; let querier_config = QuerierConfig { - num_query_threads: None, // will be ignored + num_query_threads: None, // will be ignored + #[cfg(not(feature = "rpc_write"))] shard_to_ingesters_file: None, // will be ignored - shard_to_ingesters: None, // will be ignored + #[cfg(not(feature = "rpc_write"))] + shard_to_ingesters: None, // will be ignored + #[cfg(feature = "rpc_write")] + ingester_addresses: vec![], // will be ignored ram_pool_metadata_bytes: querier_ram_pool_metadata_bytes, ram_pool_data_bytes: querier_ram_pool_data_bytes, max_concurrent_queries: querier_max_concurrent_queries, diff --git a/influxdb_iox/src/commands/run/mod.rs b/influxdb_iox/src/commands/run/mod.rs index 0e9b8d032c..0da818e318 100644 --- a/influxdb_iox/src/commands/run/mod.rs +++ b/influxdb_iox/src/commands/run/mod.rs @@ -9,8 +9,6 @@ mod ingester; mod ingester2; mod main; mod querier; -#[cfg(feature = "rpc_write")] -mod querier_rpc_write; mod router; #[cfg(feature = "rpc_write")] mod router_rpc_write; @@ -28,10 +26,6 @@ pub enum Error { #[snafu(display("Error in querier subcommand: {}", source))] QuerierError { source: querier::Error }, - #[cfg(feature = "rpc_write")] - #[snafu(display("Error in querier-rpc-write subcommand: {}", source))] - QuerierRpcWriteError { source: querier_rpc_write::Error }, - #[snafu(display("Error in router subcommand: {}", source))] RouterError { source: router::Error }, @@ -72,8 +66,6 @@ impl Config { Some(Command::Compactor(config)) => config.run_config.logging_config(), Some(Command::GarbageCollector(config)) => config.run_config.logging_config(), Some(Command::Querier(config)) => config.run_config.logging_config(), - #[cfg(feature = "rpc_write")] - Some(Command::QuerierRpcWrite(config)) => config.run_config.logging_config(), Some(Command::Router(config)) => config.run_config.logging_config(), #[cfg(feature = "rpc_write")] Some(Command::RouterRpcWrite(config)) => config.run_config.logging_config(), @@ -94,10 +86,6 @@ enum Command { /// Run the server in querier mode Querier(querier::Config), - /// Run the server in querier mode using the RPC write path. - #[cfg(feature = "rpc_write")] - QuerierRpcWrite(querier_rpc_write::Config), - /// Run the server in router mode Router(router::Config), @@ -134,10 +122,6 @@ pub async fn command(config: Config) -> Result<()> { .await .context(GarbageCollectorSnafu), Some(Command::Querier(config)) => querier::command(config).await.context(QuerierSnafu), - #[cfg(feature = "rpc_write")] - Some(Command::QuerierRpcWrite(config)) => querier_rpc_write::command(config) - .await - .context(QuerierRpcWriteSnafu), Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu), #[cfg(feature = "rpc_write")] Some(Command::RouterRpcWrite(config)) => router_rpc_write::command(config) diff --git a/influxdb_iox/src/commands/run/querier.rs b/influxdb_iox/src/commands/run/querier.rs index 6620cda8c8..e7c36963f2 100644 --- a/influxdb_iox/src/commands/run/querier.rs +++ b/influxdb_iox/src/commands/run/querier.rs @@ -96,6 +96,12 @@ pub async fn command(config: Config) -> Result<(), Error> { let num_threads = num_query_threads.unwrap_or_else(num_cpus::get); info!(%num_threads, "using specified number of threads per thread pool"); + if config.querier_config.rpc_write() { + info!("using the RPC write path"); + } else { + info!("using the write buffer path"); + } + let ingester_addresses = config.querier_config.ingester_addresses()?; info!(?ingester_addresses, "using ingester addresses"); diff --git a/influxdb_iox/src/commands/run/querier_rpc_write.rs b/influxdb_iox/src/commands/run/querier_rpc_write.rs deleted file mode 100644 index 975ff36443..0000000000 --- a/influxdb_iox/src/commands/run/querier_rpc_write.rs +++ /dev/null @@ -1,119 +0,0 @@ -//! Command line options for running the querier using the RPC write path. - -use super::main; -use crate::process_info::setup_metric_registry; -use clap_blocks::{ - catalog_dsn::CatalogDsnConfig, object_store::make_object_store, - querier_rpc_write::QuerierRpcWriteConfig, run_config::RunConfig, -}; -use iox_query::exec::Executor; -use iox_time::{SystemProvider, TimeProvider}; -use ioxd_common::{ - server_type::{CommonServerState, CommonServerStateError}, - Service, -}; -use ioxd_querier::{create_querier_grpc_write_server_type, QuerierRpcWriteServerTypeArgs}; -use object_store::DynObjectStore; -use object_store_metrics::ObjectStoreMetrics; -use observability_deps::tracing::*; -use std::sync::Arc; -use thiserror::Error; - -#[derive(Debug, Error)] -pub enum Error { - #[error("Run: {0}")] - Run(#[from] main::Error), - - #[error("Invalid config: {0}")] - InvalidConfigCommon(#[from] CommonServerStateError), - - #[error("Catalog error: {0}")] - Catalog(#[from] iox_catalog::interface::Error), - - #[error("Catalog DSN error: {0}")] - CatalogDsn(#[from] clap_blocks::catalog_dsn::Error), - - #[error("Cannot parse object store config: {0}")] - ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError), - - #[error("Querier error: {0}")] - Querier(#[from] ioxd_querier::Error), -} - -#[derive(Debug, clap::Parser)] -#[clap( - name = "run", - about = "Runs in querier mode using the RPC write path", - long_about = "Run the IOx querier server.\n\nThe configuration options below can be \ - set either with the command line flags or with the specified environment \ - variable. If there is a file named '.env' in the current working directory, \ - it is sourced before loading the configuration. - -Configuration is loaded from the following sources (highest precedence first): - - command line arguments - - user set environment variables - - .env file contents - - pre-configured default values" -)] -pub struct Config { - #[clap(flatten)] - pub(crate) run_config: RunConfig, - - #[clap(flatten)] - pub(crate) catalog_dsn: CatalogDsnConfig, - - #[clap(flatten)] - pub(crate) querier_config: QuerierRpcWriteConfig, -} - -pub async fn command(config: Config) -> Result<(), Error> { - let common_state = CommonServerState::from_config(config.run_config.clone())?; - - let time_provider = Arc::new(SystemProvider::new()) as Arc; - let metric_registry = setup_metric_registry(); - - let catalog = config - .catalog_dsn - .get_catalog("querier_rpc_write", Arc::clone(&metric_registry)) - .await?; - - let object_store = make_object_store(config.run_config.object_store_config()) - .map_err(Error::ObjectStoreParsing)?; - // Decorate the object store with a metric recorder. - let object_store: Arc = Arc::new(ObjectStoreMetrics::new( - object_store, - Arc::clone(&time_provider), - &metric_registry, - )); - - let time_provider = Arc::new(SystemProvider::new()); - - let num_threads = config - .querier_config - .num_query_threads - .unwrap_or_else(num_cpus::get); - info!(%num_threads, "using specified number of threads per thread pool"); - - info!(ingester_addresses=?config.querier_config.ingester_addresses, "using ingester addresses"); - - let exec = Arc::new(Executor::new( - num_threads, - config.querier_config.exec_mem_pool_bytes, - )); - - let server_type = create_querier_grpc_write_server_type(QuerierRpcWriteServerTypeArgs { - common_state: &common_state, - metric_registry: Arc::clone(&metric_registry), - catalog, - object_store, - exec, - time_provider, - querier_config: config.querier_config, - }) - .await?; - - info!("starting querier"); - - let services = vec![Service::create(server_type, common_state.run_config())]; - Ok(main::main(common_state, services, metric_registry).await?) -} diff --git a/ioxd_querier/Cargo.toml b/ioxd_querier/Cargo.toml index aa9c298c0c..17c60fff2a 100644 --- a/ioxd_querier/Cargo.toml +++ b/ioxd_querier/Cargo.toml @@ -17,7 +17,6 @@ object_store = "0.5.2" querier = { path = "../querier" } iox_query = { path = "../iox_query" } router = { path = "../router" } -service_common = { path = "../service_common" } service_grpc_flight = { path = "../service_grpc_flight" } service_grpc_influxrpc = { path = "../service_grpc_influxrpc" } sharder = { path = "../sharder" } diff --git a/ioxd_querier/src/lib.rs b/ioxd_querier/src/lib.rs index 100fb7663b..1ff67af8d0 100644 --- a/ioxd_querier/src/lib.rs +++ b/ioxd_querier/src/lib.rs @@ -1,8 +1,5 @@ use async_trait::async_trait; -use clap_blocks::{ - querier::{IngesterAddresses, QuerierConfig}, - querier_rpc_write::QuerierRpcWriteConfig, -}; +use clap_blocks::querier::{IngesterAddresses, QuerierConfig}; use hyper::{Body, Request, Response}; use iox_catalog::interface::Catalog; use iox_query::exec::{Executor, ExecutorType}; @@ -18,11 +15,9 @@ use ioxd_common::{ use metric::Registry; use object_store::DynObjectStore; use querier::{ - create_ingester2_connection, create_ingester_connections_by_shard, Database, - QuerierCatalogCache, QuerierDatabase, QuerierHandler, QuerierHandlerImpl, - QuerierRpcWriteDatabase, QuerierServer, + create_ingester_connections, QuerierCatalogCache, QuerierDatabase, QuerierHandler, + QuerierHandlerImpl, QuerierServer, }; -use service_common::QueryNamespaceProvider; use std::{ fmt::{Debug, Display}, sync::Arc, @@ -33,24 +28,22 @@ use trace::TraceCollector; mod rpc; -pub struct QuerierServerType { - database: Arc, +pub struct QuerierServerType { + database: Arc, server: QuerierServer, trace_collector: Option>, } -impl std::fmt::Debug - for QuerierServerType -{ +impl std::fmt::Debug for QuerierServerType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Querier") } } -impl QuerierServerType { +impl QuerierServerType { pub fn new( server: QuerierServer, - database: Arc, + database: Arc, common_state: &CommonServerState, ) -> Self { Self { @@ -62,9 +55,7 @@ impl QuerierServerType< } #[async_trait] -impl ServerType - for QuerierServerType -{ +impl ServerType for QuerierServerType { /// Return the [`metric::Registry`] used by the compactor. fn metric_registry(&self) -> Arc { self.server.metric_registry() @@ -148,18 +139,6 @@ impl HttpApiErrorSource for IoxHttpError { } } -/// Arguments required to create a [`ServerType`] for the querier using the RPC write path. -#[derive(Debug)] -pub struct QuerierRpcWriteServerTypeArgs<'a> { - pub common_state: &'a CommonServerState, - pub metric_registry: Arc, - pub catalog: Arc, - pub object_store: Arc, - pub exec: Arc, - pub time_provider: Arc, - pub querier_config: QuerierRpcWriteConfig, -} - /// Arguments required to create a [`ServerType`] for the querier. #[derive(Debug)] pub struct QuerierServerTypeArgs<'a> { @@ -179,64 +158,6 @@ pub enum Error { Querier(#[from] querier::QuerierDatabaseError), } -/// Instantiate a querier server that uses the RPC write path -pub async fn create_querier_grpc_write_server_type( - args: QuerierRpcWriteServerTypeArgs<'_>, -) -> Result, Error> { - let catalog_cache = Arc::new(QuerierCatalogCache::new( - Arc::clone(&args.catalog), - args.time_provider, - Arc::clone(&args.metric_registry), - Arc::clone(&args.object_store), - args.querier_config.ram_pool_metadata_bytes, - args.querier_config.ram_pool_data_bytes, - &Handle::current(), - )); - - // register cached object store with the execution context - let parquet_store = catalog_cache.parquet_store(); - let existing = args - .exec - .new_context(ExecutorType::Query) - .inner() - .runtime_env() - .register_object_store( - "iox", - parquet_store.id(), - Arc::clone(parquet_store.object_store()), - ); - assert!(existing.is_none()); - - let ingester_connection = create_ingester2_connection( - &args.querier_config.ingester_addresses, - Arc::clone(&catalog_cache), - args.querier_config.ingester_circuit_breaker_threshold, - ); - - let database = Arc::new( - QuerierRpcWriteDatabase::new( - catalog_cache, - Arc::clone(&args.metric_registry), - args.exec, - ingester_connection, - args.querier_config.max_concurrent_queries, - ) - .await?, - ); - let querier_handler = Arc::new(QuerierHandlerImpl::new( - args.catalog, - Arc::clone(&database) as _, - Arc::clone(&args.object_store), - )); - - let querier = QuerierServer::new(args.metric_registry, querier_handler); - Ok(Arc::new(QuerierServerType::new( - querier, - database, - args.common_state, - ))) -} - /// Instantiate a querier server pub async fn create_querier_server_type( args: QuerierServerTypeArgs<'_>, @@ -267,8 +188,15 @@ pub async fn create_querier_server_type( let ingester_connection = match args.ingester_addresses { IngesterAddresses::None => None, - IngesterAddresses::ByShardIndex(map) => Some(create_ingester_connections_by_shard( - map, + IngesterAddresses::ByShardIndex(map) => Some(create_ingester_connections( + Some(map), + None, + Arc::clone(&catalog_cache), + args.querier_config.ingester_circuit_breaker_threshold, + )), + IngesterAddresses::List(list) => Some(create_ingester_connections( + None, + Some(list), Arc::clone(&catalog_cache), args.querier_config.ingester_circuit_breaker_threshold, )), @@ -281,12 +209,13 @@ pub async fn create_querier_server_type( args.exec, ingester_connection, args.querier_config.max_concurrent_queries(), + args.querier_config.rpc_write(), ) .await?, ); let querier_handler = Arc::new(QuerierHandlerImpl::new( args.catalog, - Arc::clone(&database) as _, + Arc::clone(&database), Arc::clone(&args.object_store), )); diff --git a/ioxd_querier/src/rpc/namespace.rs b/ioxd_querier/src/rpc/namespace.rs index 02d21f1be4..0667f64333 100644 --- a/ioxd_querier/src/rpc/namespace.rs +++ b/ioxd_querier/src/rpc/namespace.rs @@ -7,12 +7,12 @@ use data_types::Namespace; use generated_types::influxdata::iox::namespace::v1 as proto; -use querier::Database; +use querier::QuerierDatabase; use std::sync::Arc; /// Acquire a [`NamespaceService`](proto::namespace_service_server::NamespaceService) gRPC service implementation. -pub fn namespace_service( - server: Arc, +pub fn namespace_service( + server: Arc, ) -> proto::namespace_service_server::NamespaceServiceServer< impl proto::namespace_service_server::NamespaceService, > { @@ -20,12 +20,12 @@ pub fn namespace_service( } #[derive(Debug)] -struct NamespaceServiceImpl { - server: Arc, +struct NamespaceServiceImpl { + server: Arc, } -impl NamespaceServiceImpl { - pub fn new(server: Arc) -> Self { +impl NamespaceServiceImpl { + pub fn new(server: Arc) -> Self { Self { server } } } @@ -40,9 +40,7 @@ fn namespace_to_proto(namespace: Namespace) -> proto::Namespace { } #[tonic::async_trait] -impl proto::namespace_service_server::NamespaceService - for NamespaceServiceImpl -{ +impl proto::namespace_service_server::NamespaceService for NamespaceServiceImpl { async fn get_namespaces( &self, _request: tonic::Request, @@ -82,7 +80,7 @@ mod tests { use super::*; use generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService; use iox_tests::util::TestCatalog; - use querier::{create_ingester_connection_for_testing, QuerierCatalogCache, QuerierDatabase}; + use querier::{create_ingester_connection_for_testing, QuerierCatalogCache}; use tokio::runtime::Handle; /// Common retention period value we'll use in tests @@ -109,6 +107,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + false, ) .await .unwrap(), @@ -144,6 +143,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + false, ) .await .unwrap(), @@ -173,9 +173,7 @@ mod tests { ); } - async fn get_namespaces( - service: &NamespaceServiceImpl, - ) -> proto::GetNamespacesResponse { + async fn get_namespaces(service: &NamespaceServiceImpl) -> proto::GetNamespacesResponse { let request = proto::GetNamespacesRequest {}; let mut namespaces = service diff --git a/ioxd_querier/src/rpc/query.rs b/ioxd_querier/src/rpc/query.rs index d578cdc4d3..9a55b82af3 100644 --- a/ioxd_querier/src/rpc/query.rs +++ b/ioxd_querier/src/rpc/query.rs @@ -4,14 +4,12 @@ use arrow_flight::flight_service_server::{ FlightService as Flight, FlightServiceServer as FlightServer, }; use generated_types::storage_server::{Storage, StorageServer}; -use service_common::QueryNamespaceProvider; +use querier::QuerierDatabase; -pub fn make_flight_server(server: Arc) -> FlightServer { +pub fn make_flight_server(server: Arc) -> FlightServer { service_grpc_flight::make_server(server) } -pub fn make_storage_server( - server: Arc, -) -> StorageServer { +pub fn make_storage_server(server: Arc) -> StorageServer { service_grpc_influxrpc::make_server(server) } diff --git a/ioxd_querier/src/rpc/write_info.rs b/ioxd_querier/src/rpc/write_info.rs index bbd3ff9491..74a804a3f9 100644 --- a/ioxd_querier/src/rpc/write_info.rs +++ b/ioxd_querier/src/rpc/write_info.rs @@ -4,29 +4,29 @@ use generated_types::influxdata::iox::ingester::v1::{ self as proto, write_info_service_server::{WriteInfoService, WriteInfoServiceServer}, }; -use querier::Database; +use querier::QuerierDatabase; use std::sync::Arc; /// Acquire a [`WriteInfoService`] gRPC service implementation. -pub fn write_info_service( - server: Arc, +pub fn write_info_service( + server: Arc, ) -> WriteInfoServiceServer { WriteInfoServiceServer::new(QuerierWriteInfoServiceImpl::new(server)) } #[derive(Debug)] -struct QuerierWriteInfoServiceImpl { - server: Arc, +struct QuerierWriteInfoServiceImpl { + server: Arc, } -impl QuerierWriteInfoServiceImpl { - pub fn new(server: Arc) -> Self { +impl QuerierWriteInfoServiceImpl { + pub fn new(server: Arc) -> Self { Self { server } } } #[tonic::async_trait] -impl WriteInfoService for QuerierWriteInfoServiceImpl { +impl WriteInfoService for QuerierWriteInfoServiceImpl { async fn get_write_info( &self, request: tonic::Request, diff --git a/querier/src/database.rs b/querier/src/database.rs index a5a4be72b9..59e708c01f 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -12,7 +12,7 @@ use iox_query::exec::Executor; use service_common::QueryNamespaceProvider; use sharder::JumpHash; use snafu::Snafu; -use std::{collections::BTreeSet, fmt::Debug, sync::Arc}; +use std::{collections::BTreeSet, sync::Arc}; use trace::span::{Span, SpanRecorder}; use tracker::{ AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore, @@ -34,19 +34,6 @@ pub enum Error { NoShards, } -/// Shared Querier Database behavior whether using the RPC write path or not -#[async_trait] -pub trait Database: Debug + Send + Sync + 'static { - /// Executor access - fn exec(&self) -> &Executor; - - /// Return all namespaces this querier knows about - async fn namespaces(&self) -> Vec; - - /// Return connection to ingester(s) to get and aggregate information from them - fn ingester_connection(&self) -> Option>; -} - /// Database for the querier. /// /// Contains all namespaces. @@ -82,7 +69,8 @@ pub struct QuerierDatabase { query_execution_semaphore: Arc, /// Sharder to determine which ingesters to query for a particular table and namespace. - sharder: Arc>>, + /// Only relevant when using the write buffer; will be None if using RPC write ingesters. + sharder: Option>>>, /// Chunk prune metrics. prune_metrics: Arc, @@ -104,27 +92,6 @@ impl QueryNamespaceProvider for QuerierDatabase { } } -#[async_trait] -impl Database for QuerierDatabase { - fn exec(&self) -> &Executor { - &self.exec - } - - async fn namespaces(&self) -> Vec { - let catalog = &self.catalog_cache.catalog(); - Backoff::new(&self.backoff_config) - .retry_all_errors("listing namespaces", || async { - catalog.repositories().await.namespaces().list().await - }) - .await - .expect("retry forever") - } - - fn ingester_connection(&self) -> Option> { - self.ingester_connection.clone() - } -} - impl QuerierDatabase { /// The maximum value for `max_concurrent_queries` that is allowed. /// @@ -140,6 +107,7 @@ impl QuerierDatabase { exec: Arc, ingester_connection: Option>, max_concurrent_queries: usize, + rpc_write: bool, ) -> Result { assert!( max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX, @@ -162,9 +130,13 @@ impl QuerierDatabase { let query_execution_semaphore = Arc::new(semaphore_metrics.new_semaphore(max_concurrent_queries)); - let sharder = Arc::new( - create_sharder(catalog_cache.catalog().as_ref(), backoff_config.clone()).await?, - ); + let sharder = if rpc_write { + None + } else { + Some(Arc::new( + create_sharder(catalog_cache.catalog().as_ref(), backoff_config.clone()).await?, + )) + }; let prune_metrics = Arc::new(PruneMetrics::new(&metric_registry)); @@ -206,10 +178,31 @@ impl QuerierDatabase { Arc::clone(&self.exec), self.ingester_connection.clone(), Arc::clone(&self.query_log), - Some(Arc::clone(&self.sharder)), + self.sharder.clone(), Arc::clone(&self.prune_metrics), ))) } + + /// Return all namespaces this querier knows about + pub async fn namespaces(&self) -> Vec { + let catalog = &self.catalog_cache.catalog(); + Backoff::new(&self.backoff_config) + .retry_all_errors("listing namespaces", || async { + catalog.repositories().await.namespaces().list().await + }) + .await + .expect("retry forever") + } + + /// Return connection to ingester(s) to get and aggregate information from them + pub fn ingester_connection(&self) -> Option> { + self.ingester_connection.clone() + } + + /// Executor + pub(crate) fn exec(&self) -> &Executor { + &self.exec + } } pub async fn create_sharder( @@ -241,163 +234,6 @@ pub async fn create_sharder( Ok(JumpHash::new(shard_indexes.into_iter().map(Arc::new))) } -/// Database for the querier using the RPC write path. -/// -/// Contains all namespaces. -#[derive(Debug)] -pub struct QuerierRpcWriteDatabase { - /// Backoff config for IO operations. - backoff_config: BackoffConfig, - - /// Catalog cache. - catalog_cache: Arc, - - /// Adapter to create chunks. - chunk_adapter: Arc, - - /// Metric registry - #[allow(dead_code)] - metric_registry: Arc, - - /// Executor for queries. - exec: Arc, - - /// Connection to ingester(s) - ingester_connection: Option>, - - /// Query log. - query_log: Arc, - - /// Semaphore that limits the number of namespaces in used at the time by the query subsystem. - /// - /// This should be a 1-to-1 relation to the number of active queries. - /// - /// If the same namespace is requested twice for different queries, it is counted twice. - query_execution_semaphore: Arc, - - /// Chunk prune metrics. - prune_metrics: Arc, -} - -impl QuerierRpcWriteDatabase { - /// The maximum value for `max_concurrent_queries` that is allowed. - /// - /// This limit exists because [`tokio::sync::Semaphore`] has an internal limit and semaphore - /// creation beyond that will panic. The tokio limit is not exposed though so we pick a - /// reasonable but smaller number. - pub const MAX_CONCURRENT_QUERIES_MAX: usize = u16::MAX as usize; - - /// Create new database. - pub async fn new( - catalog_cache: Arc, - metric_registry: Arc, - exec: Arc, - ingester_connection: Option>, - max_concurrent_queries: usize, - ) -> Result { - assert!( - max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX, - "`max_concurrent_queries` ({}) > `max_concurrent_queries_MAX` ({})", - max_concurrent_queries, - Self::MAX_CONCURRENT_QUERIES_MAX, - ); - - let backoff_config = BackoffConfig::default(); - - let chunk_adapter = Arc::new(ChunkAdapter::new( - Arc::clone(&catalog_cache), - Arc::clone(&metric_registry), - )); - let query_log = Arc::new(QueryLog::new(QUERY_LOG_SIZE, catalog_cache.time_provider())); - let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new( - &metric_registry, - &[("semaphore", "query_execution")], - )); - let query_execution_semaphore = - Arc::new(semaphore_metrics.new_semaphore(max_concurrent_queries)); - - let prune_metrics = Arc::new(PruneMetrics::new(&metric_registry)); - - Ok(Self { - backoff_config, - catalog_cache, - chunk_adapter, - metric_registry, - exec, - ingester_connection, - query_log, - query_execution_semaphore, - prune_metrics, - }) - } - - /// Get namespace if it exists. - /// - /// This will await the internal namespace semaphore. Existence of namespaces is checked AFTER - /// a semaphore permit was acquired since this lowers the chance that we obtain stale data. - pub async fn namespace(&self, name: &str, span: Option) -> Option> { - let span_recorder = SpanRecorder::new(span); - let name = Arc::from(name.to_owned()); - let ns = self - .catalog_cache - .namespace() - .get( - Arc::clone(&name), - // we have no specific need for any tables or columns at this point, so nothing to cover - &[], - span_recorder.child_span("cache GET namespace schema"), - ) - .await?; - Some(Arc::new(QuerierNamespace::new( - Arc::clone(&self.chunk_adapter), - ns, - name, - Arc::clone(&self.exec), - self.ingester_connection.clone(), - Arc::clone(&self.query_log), - None, - Arc::clone(&self.prune_metrics), - ))) - } -} - -#[async_trait] -impl Database for QuerierRpcWriteDatabase { - fn exec(&self) -> &Executor { - &self.exec - } - - async fn namespaces(&self) -> Vec { - let catalog = &self.catalog_cache.catalog(); - Backoff::new(&self.backoff_config) - .retry_all_errors("listing namespaces", || async { - catalog.repositories().await.namespaces().list().await - }) - .await - .expect("retry forever") - } - - fn ingester_connection(&self) -> Option> { - self.ingester_connection.clone() - } -} - -#[async_trait] -impl QueryNamespaceProvider for QuerierRpcWriteDatabase { - type Db = QuerierNamespace; - - async fn db(&self, name: &str, span: Option) -> Option> { - self.namespace(name, span).await - } - - async fn acquire_semaphore(&self, span: Option) -> InstrumentedAsyncOwnedSemaphorePermit { - Arc::clone(&self.query_execution_semaphore) - .acquire_owned(span) - .await - .expect("Semaphore should not be closed by anyone") - } -} - #[cfg(test)] mod tests { use super::*; @@ -426,6 +262,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX.saturating_add(1), + false, ) .await .unwrap(); @@ -450,6 +287,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + false, ) .await, Error::NoShards @@ -475,6 +313,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + false, ) .await .unwrap(); @@ -504,6 +343,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + false, ) .await .unwrap(); diff --git a/querier/src/handler.rs b/querier/src/handler.rs index 602355e844..5de83819ed 100644 --- a/querier/src/handler.rs +++ b/querier/src/handler.rs @@ -22,7 +22,7 @@ use thiserror::Error; use tokio::task::{JoinError, JoinHandle}; use tokio_util::sync::CancellationToken; -use crate::{database::Database, poison::PoisonCabinet}; +use crate::{database::QuerierDatabase, poison::PoisonCabinet}; #[derive(Debug, Error)] #[allow(missing_copy_implementations, missing_docs)] @@ -65,7 +65,7 @@ pub struct QuerierHandlerImpl { catalog: Arc, /// Database that handles query operation - database: Arc, + database: Arc, /// The object store object_store: Arc, @@ -85,7 +85,7 @@ impl QuerierHandlerImpl { /// Initialize the Querier pub fn new( catalog: Arc, - database: Arc, + database: Arc, object_store: Arc, ) -> Self { let shutdown = CancellationToken::new(); @@ -159,7 +159,7 @@ impl Drop for QuerierHandlerImpl { #[cfg(test)] mod tests { use super::*; - use crate::{cache::CatalogCache, create_ingester_connection_for_testing, QuerierDatabase}; + use crate::{cache::CatalogCache, create_ingester_connection_for_testing}; use data_types::ShardIndex; use iox_catalog::mem::MemCatalog; use iox_query::exec::Executor; @@ -224,6 +224,7 @@ mod tests { exec, Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + false, ) .await .unwrap(), diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 7634d453f6..97cee9ca44 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -139,9 +139,10 @@ pub enum Error { pub type Result = std::result::Result; -/// Create a new set of connections given a map of shard indexes to Ingester configurations -pub fn create_ingester_connections_by_shard( - shard_to_ingesters: HashMap, +/// Create a new set of connections given ingester configurations +pub fn create_ingester_connections( + shard_to_ingesters: Option>, + ingester_addresses: Option>>, catalog_cache: Arc, open_circuit_after_n_errors: u64, ) -> Arc { @@ -161,49 +162,29 @@ pub fn create_ingester_connections_by_shard( deadline: None, }; - Arc::new(IngesterConnectionImpl::by_shard( - shard_to_ingesters, - catalog_cache, - retry_backoff_config, - circuit_breaker_backoff_config, - open_circuit_after_n_errors, - )) -} - -/// Create a new set of connections to ingester2 instances -pub fn create_ingester2_connection( - ingester_addrs: &[String], - catalog_cache: Arc, - open_circuit_after_n_errors: u64, -) -> Option> { - if ingester_addrs.is_empty() { - return None; + // Exactly one of `shard_to_ingesters` or `ingester_addreses` must be specified. + // `shard_to_ingesters` uses the Kafka write buffer path. + // `ingester_addresses` uses the RPC write path. + match (shard_to_ingesters, ingester_addresses) { + (None, None) => panic!("Neither shard_to_ingesters nor ingester_addresses was specified!"), + (Some(_), Some(_)) => { + panic!("Both shard_to_ingesters and ingester_addresses were specified!") + } + (Some(shard_to_ingesters), None) => Arc::new(IngesterConnectionImpl::by_shard( + shard_to_ingesters, + catalog_cache, + retry_backoff_config, + circuit_breaker_backoff_config, + open_circuit_after_n_errors, + )), + (None, Some(ingester_addresses)) => Arc::new(IngesterConnectionImpl::by_addrs( + ingester_addresses, + catalog_cache, + retry_backoff_config, + circuit_breaker_backoff_config, + open_circuit_after_n_errors, + )), } - - // This backoff config is used to retry requests for a specific table-scoped query. - let retry_backoff_config = BackoffConfig { - init_backoff: Duration::from_millis(100), - max_backoff: Duration::from_secs(1), - base: 3.0, - deadline: Some(Duration::from_secs(10)), - }; - - // This backoff config is used to half-open the circuit after it was opened. Circuits are - // ingester-scoped. - let circuit_breaker_backoff_config = BackoffConfig { - init_backoff: Duration::from_secs(1), - max_backoff: Duration::from_secs(60), - base: 3.0, - deadline: None, - }; - - Some(Arc::new(IngesterRpcWriteConnection::new( - ingester_addrs, - catalog_cache, - retry_backoff_config, - circuit_breaker_backoff_config, - open_circuit_after_n_errors, - ))) } /// Create a new ingester suitable for testing @@ -440,21 +421,10 @@ impl IngesterConnectionImpl { backoff_config, } } -} -/// Ingester connection that communicates with ingester2 instances that support the RPC write path. -#[derive(Debug)] -pub struct IngesterRpcWriteConnection { - ingester_addresses: Vec>, - flight_client: Arc, - catalog_cache: Arc, - metrics: Arc, - backoff_config: BackoffConfig, -} - -impl IngesterRpcWriteConnection { - fn new( - ingester_addrs: &[String], + /// Create a new set of connections given a list of ingester2 addresses. + pub fn by_addrs( + ingester_addresses: Vec>, catalog_cache: Arc, backoff_config: BackoffConfig, circuit_breaker_backoff_config: BackoffConfig, @@ -473,7 +443,8 @@ impl IngesterRpcWriteConnection { let metrics = Arc::new(IngesterConnectionMetrics::new(&metric_registry)); Self { - ingester_addresses: ingester_addrs.iter().map(|s| s.as_str().into()).collect(), + shard_to_ingesters: HashMap::new(), + unique_ingester_addresses: ingester_addresses.into_iter().collect(), flight_client, catalog_cache, metrics, @@ -482,120 +453,6 @@ impl IngesterRpcWriteConnection { } } -#[async_trait] -impl IngesterConnection for IngesterRpcWriteConnection { - /// Retrieve chunks from the ingester for the particular table, shard, and predicate - async fn partitions( - &self, - // Shard indexes aren't relevant for the RPC write path. This should always be `None`, but - // regardless, this implementation ignores the value. When the RPC write path is the only - // implementation, this parameter can be removed. - _shard_indexes: Option>, - namespace_id: NamespaceId, - table_id: TableId, - columns: Vec, - predicate: &Predicate, - expected_schema: Arc, - span: Option, - ) -> Result> { - let mut span_recorder = SpanRecorder::new(span); - - let metrics = Arc::clone(&self.metrics); - - let measured_ingester_request = |ingester_address: Arc| { - let metrics = Arc::clone(&metrics); - let request = GetPartitionForIngester { - flight_client: Arc::clone(&self.flight_client), - catalog_cache: Arc::clone(&self.catalog_cache), - ingester_address: Arc::clone(&ingester_address), - namespace_id, - table_id, - columns: columns.clone(), - predicate, - expected_schema: Arc::clone(&expected_schema), - }; - - let backoff_config = self.backoff_config.clone(); - - // wrap `execute` into an additional future so that we can measure the request time - // INFO: create the measurement structure outside of the async block so cancellation is - // always measured - let measure_me = ObserveIngesterRequest::new(request.clone(), metrics, &span_recorder); - async move { - let span_recorder = measure_me - .span_recorder() - .child("ingester request (retry block)"); - - let res = Backoff::new(&backoff_config) - .retry_all_errors("ingester request", move || { - let request = request.clone(); - let span_recorder = span_recorder.child("ingester request (single try)"); - - async move { execute(request, &span_recorder).await } - }) - .await; - - match &res { - Ok(partitions) => { - let mut status = IngesterResponseOk::default(); - for p in partitions { - status.n_partitions += 1; - for c in p.chunks() { - status.n_chunks += 1; - status.n_rows += c.rows(); - } - } - - measure_me.set_ok(status); - } - Err(_) => measure_me.set_err(), - } - - res - } - }; - - let mut ingester_partitions: Vec = self - .ingester_addresses - .iter() - .cloned() - .map(move |ingester_address| measured_ingester_request(ingester_address)) - .collect::>() - .try_collect::>() - .await - .map_err(|e| { - span_recorder.error("failed"); - match e { - BackoffError::DeadlineExceeded { source, .. } => source, - } - })? - // We have a Vec> flatten to Vec<_> - .into_iter() - .flatten() - .collect(); - - ingester_partitions.sort_by_key(|p| p.partition_id); - span_recorder.ok("done"); - Ok(ingester_partitions) - } - - async fn get_write_info(&self, write_token: &str) -> Result { - let responses = self - .ingester_addresses - .iter() - .map(|ingester_address| execute_get_write_infos(ingester_address, write_token)) - .collect::>() - .try_collect::>() - .await?; - - Ok(merge_responses(responses)) - } - - fn as_any(&self) -> &dyn Any { - self as &dyn Any - } -} - /// Struct that names all parameters to `execute` #[derive(Debug, Clone)] struct GetPartitionForIngester<'a> { @@ -924,13 +781,49 @@ impl IngesterConnection for IngesterConnectionImpl { expected_schema: Arc, span: Option, ) -> Result> { - // If no shard indexes are specified, no ingester addresses can be found. This is a - // configuration problem somewhere. - let shard_indexes = shard_indexes.expect("A `shard_indexes` list must be specified"); - assert!( - !shard_indexes.is_empty(), - "Called `IngesterConnection.partitions` with an empty `shard_indexes` list", - ); + let relevant_ingester_addresses = match shard_indexes { + // If shard indexes is None, we're using the RPC write path, and all ingesters should + // be queried. + None => self.unique_ingester_addresses.clone(), + // If shard indexes is Some([]), no ingester addresses can be found. This is a + // configuration problem somewhwere. + Some(shard_indexes) if shard_indexes.is_empty() => { + panic!("Called `IngesterConnection.partitions` with an empty `shard_indexes` list"); + } + // Otherwise, we're using the write buffer and need to look up the ingesters to contact + // by their shard index. + Some(shard_indexes) => { + // Look up the ingesters needed for the shard. Collect into a HashSet to avoid making + // multiple requests to the same ingester if that ingester is responsible for multiple + // shard_indexes relevant to this query. + let mut relevant_ingester_addresses = HashSet::new(); + + for shard_index in &shard_indexes { + match self.shard_to_ingesters.get(shard_index) { + None => { + return NoIngesterFoundForShardSnafu { + shard_index: *shard_index, + } + .fail() + } + Some(mapping) => match mapping { + IngesterMapping::Addr(addr) => { + relevant_ingester_addresses.insert(Arc::clone(addr)); + } + IngesterMapping::Ignore => (), + IngesterMapping::NotMapped => { + return ShardNotMappedSnafu { + shard_index: *shard_index, + } + .fail() + } + }, + } + } + relevant_ingester_addresses + } + }; + let mut span_recorder = SpanRecorder::new(span); let metrics = Arc::clone(&self.metrics); @@ -988,34 +881,6 @@ impl IngesterConnection for IngesterConnectionImpl { } }; - // Look up the ingesters needed for the shard. Collect into a HashSet to avoid making - // multiple requests to the same ingester if that ingester is responsible for multiple - // shard_indexes relevant to this query. - let mut relevant_ingester_addresses = HashSet::new(); - - for shard_index in &shard_indexes { - match self.shard_to_ingesters.get(shard_index) { - None => { - return NoIngesterFoundForShardSnafu { - shard_index: *shard_index, - } - .fail() - } - Some(mapping) => match mapping { - IngesterMapping::Addr(addr) => { - relevant_ingester_addresses.insert(Arc::clone(addr)); - } - IngesterMapping::Ignore => (), - IngesterMapping::NotMapped => { - return ShardNotMappedSnafu { - shard_index: *shard_index, - } - .fail() - } - }, - } - } - let mut ingester_partitions: Vec = relevant_ingester_addresses .into_iter() .map(move |ingester_address| measured_ingester_request(ingester_address)) diff --git a/querier/src/lib.rs b/querier/src/lib.rs index 44304065a0..ea6c973598 100644 --- a/querier/src/lib.rs +++ b/querier/src/lib.rs @@ -25,13 +25,10 @@ mod table; mod tombstone; pub use cache::CatalogCache as QuerierCatalogCache; -pub use database::{ - Database, Error as QuerierDatabaseError, QuerierDatabase, QuerierRpcWriteDatabase, -}; +pub use database::{Error as QuerierDatabaseError, QuerierDatabase}; pub use handler::{QuerierHandler, QuerierHandlerImpl}; pub use ingester::{ - create_ingester2_connection, create_ingester_connection_for_testing, - create_ingester_connections_by_shard, + create_ingester_connection_for_testing, create_ingester_connections, flight_client::{ Error as IngesterFlightClientError, FlightClient as IngesterFlightClient, QueryData as IngesterFlightClientQueryData,