feat: Make a mode for the querier to use ingester2 instead, behind the rpc_write feature flag

pull/24376/head
Carol (Nichols || Goulding) 2022-12-05 12:27:52 -05:00
parent c97b025751
commit 9166ace796
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
17 changed files with 751 additions and 70 deletions

1
Cargo.lock generated
View File

@ -2827,6 +2827,7 @@ dependencies = [
"parquet_file",
"querier",
"router",
"service_common",
"service_grpc_flight",
"service_grpc_influxrpc",
"sharder",

View File

@ -18,6 +18,7 @@ pub mod ingester;
pub mod ingester2;
pub mod object_store;
pub mod querier;
pub mod querier_rpc_write;
pub mod router;
pub mod router_rpc_write;
pub mod run_config;

View File

@ -0,0 +1,94 @@
//! Querier-related config for the RPC write path.
/// CLI config for the querier using the RPC write path
#[derive(Debug, Clone, PartialEq, Eq, clap::Parser)]
pub struct QuerierRpcWriteConfig {
/// The number of threads to use for queries.
///
/// If not specified, defaults to the number of cores on the system
#[clap(
long = "num-query-threads",
env = "INFLUXDB_IOX_NUM_QUERY_THREADS",
action
)]
pub num_query_threads: Option<usize>,
/// Size of memory pool used during query exec, in bytes.
///
/// If queries attempt to allocate more than this many bytes
/// during execution, they will error with "ResourcesExhausted".
#[clap(
long = "exec-mem-pool-bytes",
env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES",
default_value = "8589934592", // 8GB
action
)]
pub exec_mem_pool_bytes: usize,
/// gRPC address for the router to talk with the ingesters. For
/// example:
///
/// "http://127.0.0.1:8083"
///
/// or
///
/// "http://10.10.10.1:8083,http://10.10.10.2:8083"
///
/// for multiple addresses.
#[clap(
long = "ingester-addresses",
env = "INFLUXDB_IOX_INGESTER_ADDRESSES",
required = true
)]
pub ingester_addresses: Vec<String>,
/// Size of the RAM cache used to store catalog metadata information in bytes.
#[clap(
long = "ram-pool-metadata-bytes",
env = "INFLUXDB_IOX_RAM_POOL_METADATA_BYTES",
default_value = "134217728", // 128MB
action
)]
pub ram_pool_metadata_bytes: usize,
/// Size of the RAM cache used to store data in bytes.
#[clap(
long = "ram-pool-data-bytes",
env = "INFLUXDB_IOX_RAM_POOL_DATA_BYTES",
default_value = "1073741824", // 1GB
action
)]
pub ram_pool_data_bytes: usize,
/// Limit the number of concurrent queries.
#[clap(
long = "max-concurrent-queries",
env = "INFLUXDB_IOX_MAX_CONCURRENT_QUERIES",
default_value = "10",
action
)]
pub max_concurrent_queries: usize,
/// After how many ingester query errors should the querier enter circuit breaker mode?
///
/// The querier normally contacts the ingester for any unpersisted data during query planning.
/// However, when the ingester can not be contacted for some reason, the querier will begin
/// returning results that do not include unpersisted data and enter "circuit breaker mode"
/// to avoid continually retrying the failing connection on subsequent queries.
///
/// If circuits are open, the querier will NOT contact the ingester and no unpersisted data
/// will be presented to the user.
///
/// Circuits will switch to "half open" after some jittered timeout and the querier will try to
/// use the ingester in question again. If this succeeds, we are back to normal, otherwise it
/// will back off exponentially before trying again (and again ...).
///
/// In a production environment the `ingester_circuit_state` metric should be monitored.
#[clap(
long = "ingester-circuit-breaker-threshold",
env = "INFLUXDB_IOX_INGESTER_CIRCUIT_BREAKER_THRESHOLD",
default_value = "10",
action
)]
pub ingester_circuit_breaker_threshold: u64,
}

View File

@ -9,6 +9,8 @@ mod ingester;
mod ingester2;
mod main;
mod querier;
#[cfg(feature = "rpc_write")]
mod querier_rpc_write;
mod router;
#[cfg(feature = "rpc_write")]
mod router_rpc_write;
@ -26,6 +28,10 @@ pub enum Error {
#[snafu(display("Error in querier subcommand: {}", source))]
QuerierError { source: querier::Error },
#[cfg(feature = "rpc_write")]
#[snafu(display("Error in querier-rpc-write subcommand: {}", source))]
QuerierRpcWriteError { source: querier_rpc_write::Error },
#[snafu(display("Error in router subcommand: {}", source))]
RouterError { source: router::Error },
@ -66,6 +72,8 @@ impl Config {
Some(Command::Compactor(config)) => config.run_config.logging_config(),
Some(Command::GarbageCollector(config)) => config.run_config.logging_config(),
Some(Command::Querier(config)) => config.run_config.logging_config(),
#[cfg(feature = "rpc_write")]
Some(Command::QuerierRpcWrite(config)) => config.run_config.logging_config(),
Some(Command::Router(config)) => config.run_config.logging_config(),
#[cfg(feature = "rpc_write")]
Some(Command::RouterRpcWrite(config)) => config.run_config.logging_config(),
@ -86,6 +94,10 @@ enum Command {
/// Run the server in querier mode
Querier(querier::Config),
/// Run the server in querier mode using the RPC write path.
#[cfg(feature = "rpc_write")]
QuerierRpcWrite(querier_rpc_write::Config),
/// Run the server in router mode
Router(router::Config),
@ -122,6 +134,10 @@ pub async fn command(config: Config) -> Result<()> {
.await
.context(GarbageCollectorSnafu),
Some(Command::Querier(config)) => querier::command(config).await.context(QuerierSnafu),
#[cfg(feature = "rpc_write")]
Some(Command::QuerierRpcWrite(config)) => querier_rpc_write::command(config)
.await
.context(QuerierRpcWriteSnafu),
Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu),
#[cfg(feature = "rpc_write")]
Some(Command::RouterRpcWrite(config)) => router_rpc_write::command(config)

View File

@ -0,0 +1,119 @@
//! Command line options for running the querier using the RPC write path.
use super::main;
use crate::process_info::setup_metric_registry;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, object_store::make_object_store,
querier_rpc_write::QuerierRpcWriteConfig, run_config::RunConfig,
};
use iox_query::exec::Executor;
use iox_time::{SystemProvider, TimeProvider};
use ioxd_common::{
server_type::{CommonServerState, CommonServerStateError},
Service,
};
use ioxd_querier::{create_querier_grpc_write_server_type, QuerierRpcWriteServerTypeArgs};
use object_store::DynObjectStore;
use object_store_metrics::ObjectStoreMetrics;
use observability_deps::tracing::*;
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
Run(#[from] main::Error),
#[error("Invalid config: {0}")]
InvalidConfigCommon(#[from] CommonServerStateError),
#[error("Catalog error: {0}")]
Catalog(#[from] iox_catalog::interface::Error),
#[error("Catalog DSN error: {0}")]
CatalogDsn(#[from] clap_blocks::catalog_dsn::Error),
#[error("Cannot parse object store config: {0}")]
ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError),
#[error("Querier error: {0}")]
Querier(#[from] ioxd_querier::Error),
}
#[derive(Debug, clap::Parser)]
#[clap(
name = "run",
about = "Runs in querier mode using the RPC write path",
long_about = "Run the IOx querier server.\n\nThe configuration options below can be \
set either with the command line flags or with the specified environment \
variable. If there is a file named '.env' in the current working directory, \
it is sourced before loading the configuration.
Configuration is loaded from the following sources (highest precedence first):
- command line arguments
- user set environment variables
- .env file contents
- pre-configured default values"
)]
pub struct Config {
#[clap(flatten)]
pub(crate) run_config: RunConfig,
#[clap(flatten)]
pub(crate) catalog_dsn: CatalogDsnConfig,
#[clap(flatten)]
pub(crate) querier_config: QuerierRpcWriteConfig,
}
pub async fn command(config: Config) -> Result<(), Error> {
let common_state = CommonServerState::from_config(config.run_config.clone())?;
let time_provider = Arc::new(SystemProvider::new()) as Arc<dyn TimeProvider>;
let metric_registry = setup_metric_registry();
let catalog = config
.catalog_dsn
.get_catalog("querier_rpc_write", Arc::clone(&metric_registry))
.await?;
let object_store = make_object_store(config.run_config.object_store_config())
.map_err(Error::ObjectStoreParsing)?;
// Decorate the object store with a metric recorder.
let object_store: Arc<DynObjectStore> = Arc::new(ObjectStoreMetrics::new(
object_store,
Arc::clone(&time_provider),
&metric_registry,
));
let time_provider = Arc::new(SystemProvider::new());
let num_threads = config
.querier_config
.num_query_threads
.unwrap_or_else(num_cpus::get);
info!(%num_threads, "using specified number of threads per thread pool");
info!(ingester_addresses=?config.querier_config.ingester_addresses, "using ingester addresses");
let exec = Arc::new(Executor::new(
num_threads,
config.querier_config.exec_mem_pool_bytes,
));
let server_type = create_querier_grpc_write_server_type(QuerierRpcWriteServerTypeArgs {
common_state: &common_state,
metric_registry: Arc::clone(&metric_registry),
catalog,
object_store,
exec,
time_provider,
querier_config: config.querier_config,
})
.await?;
info!("starting querier");
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(main::main(common_state, services, metric_registry).await?)
}

View File

@ -17,6 +17,7 @@ object_store = "0.5.2"
querier = { path = "../querier" }
iox_query = { path = "../iox_query" }
router = { path = "../router" }
service_common = { path = "../service_common" }
service_grpc_flight = { path = "../service_grpc_flight" }
service_grpc_influxrpc = { path = "../service_grpc_influxrpc" }
sharder = { path = "../sharder" }

View File

@ -1,5 +1,8 @@
use async_trait::async_trait;
use clap_blocks::querier::{IngesterAddresses, QuerierConfig};
use clap_blocks::{
querier::{IngesterAddresses, QuerierConfig},
querier_rpc_write::QuerierRpcWriteConfig,
};
use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
use iox_query::exec::{Executor, ExecutorType};
@ -15,9 +18,11 @@ use ioxd_common::{
use metric::Registry;
use object_store::DynObjectStore;
use querier::{
create_ingester_connections_by_shard, QuerierCatalogCache, QuerierDatabase, QuerierHandler,
QuerierHandlerImpl, QuerierServer,
create_ingester2_connection, create_ingester_connections_by_shard, Database,
QuerierCatalogCache, QuerierDatabase, QuerierHandler, QuerierHandlerImpl,
QuerierRpcWriteDatabase, QuerierServer,
};
use service_common::QueryNamespaceProvider;
use std::{
fmt::{Debug, Display},
sync::Arc,
@ -28,22 +33,24 @@ use trace::TraceCollector;
mod rpc;
pub struct QuerierServerType<C: QuerierHandler> {
database: Arc<QuerierDatabase>,
pub struct QuerierServerType<C: QuerierHandler, D: Database + QueryNamespaceProvider> {
database: Arc<D>,
server: QuerierServer<C>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl<C: QuerierHandler> std::fmt::Debug for QuerierServerType<C> {
impl<C: QuerierHandler, D: Database + QueryNamespaceProvider> std::fmt::Debug
for QuerierServerType<C, D>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Querier")
}
}
impl<C: QuerierHandler> QuerierServerType<C> {
impl<C: QuerierHandler, D: Database + QueryNamespaceProvider> QuerierServerType<C, D> {
pub fn new(
server: QuerierServer<C>,
database: Arc<QuerierDatabase>,
database: Arc<D>,
common_state: &CommonServerState,
) -> Self {
Self {
@ -55,7 +62,9 @@ impl<C: QuerierHandler> QuerierServerType<C> {
}
#[async_trait]
impl<C: QuerierHandler + std::fmt::Debug + 'static> ServerType for QuerierServerType<C> {
impl<C: QuerierHandler + std::fmt::Debug + 'static, D: Database + QueryNamespaceProvider> ServerType
for QuerierServerType<C, D>
{
/// Return the [`metric::Registry`] used by the compactor.
fn metric_registry(&self) -> Arc<Registry> {
self.server.metric_registry()
@ -139,6 +148,18 @@ impl HttpApiErrorSource for IoxHttpError {
}
}
/// Arguments required to create a [`ServerType`] for the querier using the RPC write path.
#[derive(Debug)]
pub struct QuerierRpcWriteServerTypeArgs<'a> {
pub common_state: &'a CommonServerState,
pub metric_registry: Arc<metric::Registry>,
pub catalog: Arc<dyn Catalog>,
pub object_store: Arc<DynObjectStore>,
pub exec: Arc<Executor>,
pub time_provider: Arc<dyn TimeProvider>,
pub querier_config: QuerierRpcWriteConfig,
}
/// Arguments required to create a [`ServerType`] for the querier.
#[derive(Debug)]
pub struct QuerierServerTypeArgs<'a> {
@ -158,6 +179,64 @@ pub enum Error {
Querier(#[from] querier::QuerierDatabaseError),
}
/// Instantiate a querier server that uses the RPC write path
pub async fn create_querier_grpc_write_server_type(
args: QuerierRpcWriteServerTypeArgs<'_>,
) -> Result<Arc<dyn ServerType>, Error> {
let catalog_cache = Arc::new(QuerierCatalogCache::new(
Arc::clone(&args.catalog),
args.time_provider,
Arc::clone(&args.metric_registry),
Arc::clone(&args.object_store),
args.querier_config.ram_pool_metadata_bytes,
args.querier_config.ram_pool_data_bytes,
&Handle::current(),
));
// register cached object store with the execution context
let parquet_store = catalog_cache.parquet_store();
let existing = args
.exec
.new_context(ExecutorType::Query)
.inner()
.runtime_env()
.register_object_store(
"iox",
parquet_store.id(),
Arc::clone(parquet_store.object_store()),
);
assert!(existing.is_none());
let ingester_connection = create_ingester2_connection(
&args.querier_config.ingester_addresses,
Arc::clone(&catalog_cache),
args.querier_config.ingester_circuit_breaker_threshold,
);
let database = Arc::new(
QuerierRpcWriteDatabase::new(
catalog_cache,
Arc::clone(&args.metric_registry),
args.exec,
ingester_connection,
args.querier_config.max_concurrent_queries,
)
.await?,
);
let querier_handler = Arc::new(QuerierHandlerImpl::new(
args.catalog,
Arc::clone(&database) as _,
Arc::clone(&args.object_store),
));
let querier = QuerierServer::new(args.metric_registry, querier_handler);
Ok(Arc::new(QuerierServerType::new(
querier,
database,
args.common_state,
)))
}
/// Instantiate a querier server
pub async fn create_querier_server_type(
args: QuerierServerTypeArgs<'_>,
@ -207,7 +286,7 @@ pub async fn create_querier_server_type(
);
let querier_handler = Arc::new(QuerierHandlerImpl::new(
args.catalog,
Arc::clone(&database),
Arc::clone(&database) as _,
Arc::clone(&args.object_store),
));

View File

@ -7,12 +7,12 @@
use data_types::Namespace;
use generated_types::influxdata::iox::namespace::v1 as proto;
use querier::QuerierDatabase;
use querier::Database;
use std::sync::Arc;
/// Acquire a [`NamespaceService`](proto::namespace_service_server::NamespaceService) gRPC service implementation.
pub fn namespace_service(
server: Arc<QuerierDatabase>,
pub fn namespace_service<S: Database + Send + Sync + 'static>(
server: Arc<S>,
) -> proto::namespace_service_server::NamespaceServiceServer<
impl proto::namespace_service_server::NamespaceService,
> {
@ -20,12 +20,12 @@ pub fn namespace_service(
}
#[derive(Debug)]
struct NamespaceServiceImpl {
server: Arc<QuerierDatabase>,
struct NamespaceServiceImpl<S> {
server: Arc<S>,
}
impl NamespaceServiceImpl {
pub fn new(server: Arc<QuerierDatabase>) -> Self {
impl<S> NamespaceServiceImpl<S> {
pub fn new(server: Arc<S>) -> Self {
Self { server }
}
}
@ -40,7 +40,9 @@ fn namespace_to_proto(namespace: Namespace) -> proto::Namespace {
}
#[tonic::async_trait]
impl proto::namespace_service_server::NamespaceService for NamespaceServiceImpl {
impl<S: Database + Send + Sync + 'static> proto::namespace_service_server::NamespaceService
for NamespaceServiceImpl<S>
{
async fn get_namespaces(
&self,
_request: tonic::Request<proto::GetNamespacesRequest>,

View File

@ -4,12 +4,14 @@ use arrow_flight::flight_service_server::{
FlightService as Flight, FlightServiceServer as FlightServer,
};
use generated_types::storage_server::{Storage, StorageServer};
use querier::QuerierDatabase;
use service_common::QueryNamespaceProvider;
pub fn make_flight_server(server: Arc<QuerierDatabase>) -> FlightServer<impl Flight> {
pub fn make_flight_server<S: QueryNamespaceProvider>(server: Arc<S>) -> FlightServer<impl Flight> {
service_grpc_flight::make_server(server)
}
pub fn make_storage_server(server: Arc<QuerierDatabase>) -> StorageServer<impl Storage> {
pub fn make_storage_server<S: QueryNamespaceProvider>(
server: Arc<S>,
) -> StorageServer<impl Storage> {
service_grpc_influxrpc::make_server(server)
}

View File

@ -4,29 +4,29 @@ use generated_types::influxdata::iox::ingester::v1::{
self as proto,
write_info_service_server::{WriteInfoService, WriteInfoServiceServer},
};
use querier::QuerierDatabase;
use querier::Database;
use std::sync::Arc;
/// Acquire a [`WriteInfoService`] gRPC service implementation.
pub fn write_info_service(
server: Arc<QuerierDatabase>,
pub fn write_info_service<S: Database + Send + Sync + 'static>(
server: Arc<S>,
) -> WriteInfoServiceServer<impl WriteInfoService> {
WriteInfoServiceServer::new(QuerierWriteInfoServiceImpl::new(server))
}
#[derive(Debug)]
struct QuerierWriteInfoServiceImpl {
server: Arc<QuerierDatabase>,
struct QuerierWriteInfoServiceImpl<S> {
server: Arc<S>,
}
impl QuerierWriteInfoServiceImpl {
pub fn new(server: Arc<QuerierDatabase>) -> Self {
impl<S> QuerierWriteInfoServiceImpl<S> {
pub fn new(server: Arc<S>) -> Self {
Self { server }
}
}
#[tonic::async_trait]
impl WriteInfoService for QuerierWriteInfoServiceImpl {
impl<S: Database + Send + Sync + 'static> WriteInfoService for QuerierWriteInfoServiceImpl<S> {
async fn get_write_info(
&self,
request: tonic::Request<proto::GetWriteInfoRequest>,

View File

@ -12,7 +12,7 @@ use iox_query::exec::Executor;
use service_common::QueryNamespaceProvider;
use sharder::JumpHash;
use snafu::Snafu;
use std::{collections::BTreeSet, sync::Arc};
use std::{collections::BTreeSet, fmt::Debug, sync::Arc};
use trace::span::{Span, SpanRecorder};
use tracker::{
AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore,
@ -34,6 +34,19 @@ pub enum Error {
NoShards,
}
/// Shared Querier Database behavior whether using the RPC write path or not
#[async_trait]
pub trait Database: Debug + Send + Sync + 'static {
/// Executor access
fn exec(&self) -> &Executor;
/// Return all namespaces this querier knows about
async fn namespaces(&self) -> Vec<Namespace>;
/// Return connection to ingester(s) to get and aggregate information from them
fn ingester_connection(&self) -> Option<Arc<dyn IngesterConnection>>;
}
/// Database for the querier.
///
/// Contains all namespaces.
@ -91,6 +104,27 @@ impl QueryNamespaceProvider for QuerierDatabase {
}
}
#[async_trait]
impl Database for QuerierDatabase {
fn exec(&self) -> &Executor {
&self.exec
}
async fn namespaces(&self) -> Vec<Namespace> {
let catalog = &self.catalog_cache.catalog();
Backoff::new(&self.backoff_config)
.retry_all_errors("listing namespaces", || async {
catalog.repositories().await.namespaces().list().await
})
.await
.expect("retry forever")
}
fn ingester_connection(&self) -> Option<Arc<dyn IngesterConnection>> {
self.ingester_connection.clone()
}
}
impl QuerierDatabase {
/// The maximum value for `max_concurrent_queries` that is allowed.
///
@ -172,31 +206,10 @@ impl QuerierDatabase {
Arc::clone(&self.exec),
self.ingester_connection.clone(),
Arc::clone(&self.query_log),
Arc::clone(&self.sharder),
Some(Arc::clone(&self.sharder)),
Arc::clone(&self.prune_metrics),
)))
}
/// Return all namespaces this querier knows about
pub async fn namespaces(&self) -> Vec<Namespace> {
let catalog = &self.catalog_cache.catalog();
Backoff::new(&self.backoff_config)
.retry_all_errors("listing namespaces", || async {
catalog.repositories().await.namespaces().list().await
})
.await
.expect("retry forever")
}
/// Return connection to ingester(s) to get and aggregate information from them
pub fn ingester_connection(&self) -> Option<Arc<dyn IngesterConnection>> {
self.ingester_connection.clone()
}
/// Executor
pub(crate) fn exec(&self) -> &Executor {
&self.exec
}
}
pub async fn create_sharder(
@ -228,6 +241,163 @@ pub async fn create_sharder(
Ok(JumpHash::new(shard_indexes.into_iter().map(Arc::new)))
}
/// Database for the querier using the RPC write path.
///
/// Contains all namespaces.
#[derive(Debug)]
pub struct QuerierRpcWriteDatabase {
/// Backoff config for IO operations.
backoff_config: BackoffConfig,
/// Catalog cache.
catalog_cache: Arc<CatalogCache>,
/// Adapter to create chunks.
chunk_adapter: Arc<ChunkAdapter>,
/// Metric registry
#[allow(dead_code)]
metric_registry: Arc<metric::Registry>,
/// Executor for queries.
exec: Arc<Executor>,
/// Connection to ingester(s)
ingester_connection: Option<Arc<dyn IngesterConnection>>,
/// Query log.
query_log: Arc<QueryLog>,
/// Semaphore that limits the number of namespaces in used at the time by the query subsystem.
///
/// This should be a 1-to-1 relation to the number of active queries.
///
/// If the same namespace is requested twice for different queries, it is counted twice.
query_execution_semaphore: Arc<InstrumentedAsyncSemaphore>,
/// Chunk prune metrics.
prune_metrics: Arc<PruneMetrics>,
}
impl QuerierRpcWriteDatabase {
/// The maximum value for `max_concurrent_queries` that is allowed.
///
/// This limit exists because [`tokio::sync::Semaphore`] has an internal limit and semaphore
/// creation beyond that will panic. The tokio limit is not exposed though so we pick a
/// reasonable but smaller number.
pub const MAX_CONCURRENT_QUERIES_MAX: usize = u16::MAX as usize;
/// Create new database.
pub async fn new(
catalog_cache: Arc<CatalogCache>,
metric_registry: Arc<metric::Registry>,
exec: Arc<Executor>,
ingester_connection: Option<Arc<dyn IngesterConnection>>,
max_concurrent_queries: usize,
) -> Result<Self, Error> {
assert!(
max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX,
"`max_concurrent_queries` ({}) > `max_concurrent_queries_MAX` ({})",
max_concurrent_queries,
Self::MAX_CONCURRENT_QUERIES_MAX,
);
let backoff_config = BackoffConfig::default();
let chunk_adapter = Arc::new(ChunkAdapter::new(
Arc::clone(&catalog_cache),
Arc::clone(&metric_registry),
));
let query_log = Arc::new(QueryLog::new(QUERY_LOG_SIZE, catalog_cache.time_provider()));
let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new(
&metric_registry,
&[("semaphore", "query_execution")],
));
let query_execution_semaphore =
Arc::new(semaphore_metrics.new_semaphore(max_concurrent_queries));
let prune_metrics = Arc::new(PruneMetrics::new(&metric_registry));
Ok(Self {
backoff_config,
catalog_cache,
chunk_adapter,
metric_registry,
exec,
ingester_connection,
query_log,
query_execution_semaphore,
prune_metrics,
})
}
/// Get namespace if it exists.
///
/// This will await the internal namespace semaphore. Existence of namespaces is checked AFTER
/// a semaphore permit was acquired since this lowers the chance that we obtain stale data.
pub async fn namespace(&self, name: &str, span: Option<Span>) -> Option<Arc<QuerierNamespace>> {
let span_recorder = SpanRecorder::new(span);
let name = Arc::from(name.to_owned());
let ns = self
.catalog_cache
.namespace()
.get(
Arc::clone(&name),
// we have no specific need for any tables or columns at this point, so nothing to cover
&[],
span_recorder.child_span("cache GET namespace schema"),
)
.await?;
Some(Arc::new(QuerierNamespace::new(
Arc::clone(&self.chunk_adapter),
ns,
name,
Arc::clone(&self.exec),
self.ingester_connection.clone(),
Arc::clone(&self.query_log),
None,
Arc::clone(&self.prune_metrics),
)))
}
}
#[async_trait]
impl Database for QuerierRpcWriteDatabase {
fn exec(&self) -> &Executor {
&self.exec
}
async fn namespaces(&self) -> Vec<Namespace> {
let catalog = &self.catalog_cache.catalog();
Backoff::new(&self.backoff_config)
.retry_all_errors("listing namespaces", || async {
catalog.repositories().await.namespaces().list().await
})
.await
.expect("retry forever")
}
fn ingester_connection(&self) -> Option<Arc<dyn IngesterConnection>> {
self.ingester_connection.clone()
}
}
#[async_trait]
impl QueryNamespaceProvider for QuerierRpcWriteDatabase {
type Db = QuerierNamespace;
async fn db(&self, name: &str, span: Option<Span>) -> Option<Arc<Self::Db>> {
self.namespace(name, span).await
}
async fn acquire_semaphore(&self, span: Option<Span>) -> InstrumentedAsyncOwnedSemaphorePermit {
Arc::clone(&self.query_execution_semaphore)
.acquire_owned(span)
.await
.expect("Semaphore should not be closed by anyone")
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -22,7 +22,7 @@ use thiserror::Error;
use tokio::task::{JoinError, JoinHandle};
use tokio_util::sync::CancellationToken;
use crate::{database::QuerierDatabase, poison::PoisonCabinet};
use crate::{database::Database, poison::PoisonCabinet};
#[derive(Debug, Error)]
#[allow(missing_copy_implementations, missing_docs)]
@ -65,7 +65,7 @@ pub struct QuerierHandlerImpl {
catalog: Arc<dyn Catalog>,
/// Database that handles query operation
database: Arc<QuerierDatabase>,
database: Arc<dyn Database>,
/// The object store
object_store: Arc<dyn ObjectStore>,
@ -85,7 +85,7 @@ impl QuerierHandlerImpl {
/// Initialize the Querier
pub fn new(
catalog: Arc<dyn Catalog>,
database: Arc<QuerierDatabase>,
database: Arc<dyn Database>,
object_store: Arc<dyn ObjectStore>,
) -> Self {
let shutdown = CancellationToken::new();

View File

@ -170,6 +170,42 @@ pub fn create_ingester_connections_by_shard(
))
}
/// Create a new set of connections to ingester2 instances
pub fn create_ingester2_connection(
ingester_addrs: &[String],
catalog_cache: Arc<CatalogCache>,
open_circuit_after_n_errors: u64,
) -> Option<Arc<dyn IngesterConnection>> {
if ingester_addrs.is_empty() {
return None;
}
// This backoff config is used to retry requests for a specific table-scoped query.
let retry_backoff_config = BackoffConfig {
init_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(1),
base: 3.0,
deadline: Some(Duration::from_secs(10)),
};
// This backoff config is used to half-open the circuit after it was opened. Circuits are
// ingester-scoped.
let circuit_breaker_backoff_config = BackoffConfig {
init_backoff: Duration::from_secs(1),
max_backoff: Duration::from_secs(60),
base: 3.0,
deadline: None,
};
Some(Arc::new(IngesterRpcWriteConnection::new(
ingester_addrs,
catalog_cache,
retry_backoff_config,
circuit_breaker_backoff_config,
open_circuit_after_n_errors,
)))
}
/// Create a new ingester suitable for testing
pub fn create_ingester_connection_for_testing() -> Arc<dyn IngesterConnection> {
Arc::new(MockIngesterConnection::new())
@ -187,7 +223,7 @@ pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static {
#[allow(clippy::too_many_arguments)]
async fn partitions(
&self,
shard_indexes: &[ShardIndex],
shard_indexes: Option<Vec<ShardIndex>>,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
@ -406,6 +442,160 @@ impl IngesterConnectionImpl {
}
}
/// Ingester connection that communicates with ingester2 instances that support the RPC write path.
#[derive(Debug)]
pub struct IngesterRpcWriteConnection {
ingester_addresses: Vec<Arc<str>>,
flight_client: Arc<dyn FlightClient>,
catalog_cache: Arc<CatalogCache>,
metrics: Arc<IngesterConnectionMetrics>,
backoff_config: BackoffConfig,
}
impl IngesterRpcWriteConnection {
fn new(
ingester_addrs: &[String],
catalog_cache: Arc<CatalogCache>,
backoff_config: BackoffConfig,
circuit_breaker_backoff_config: BackoffConfig,
open_circuit_after_n_errors: u64,
) -> Self {
let flight_client = Arc::new(FlightClientImpl::new());
let flight_client = Arc::new(CircuitBreakerFlightClient::new(
flight_client,
catalog_cache.time_provider(),
catalog_cache.metric_registry(),
open_circuit_after_n_errors,
circuit_breaker_backoff_config,
));
let metric_registry = catalog_cache.metric_registry();
let metrics = Arc::new(IngesterConnectionMetrics::new(&metric_registry));
Self {
ingester_addresses: ingester_addrs.iter().map(|s| s.as_str().into()).collect(),
flight_client,
catalog_cache,
metrics,
backoff_config,
}
}
}
#[async_trait]
impl IngesterConnection for IngesterRpcWriteConnection {
/// Retrieve chunks from the ingester for the particular table, shard, and predicate
async fn partitions(
&self,
// Shard indexes aren't relevant for the RPC write path. This should always be `None`, but
// regardless, this implementation ignores the value. When the RPC write path is the only
// implementation, this parameter can be removed.
_shard_indexes: Option<Vec<ShardIndex>>,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
predicate: &Predicate,
expected_schema: Arc<Schema>,
span: Option<Span>,
) -> Result<Vec<IngesterPartition>> {
let mut span_recorder = SpanRecorder::new(span);
let metrics = Arc::clone(&self.metrics);
let measured_ingester_request = |ingester_address: Arc<str>| {
let metrics = Arc::clone(&metrics);
let request = GetPartitionForIngester {
flight_client: Arc::clone(&self.flight_client),
catalog_cache: Arc::clone(&self.catalog_cache),
ingester_address: Arc::clone(&ingester_address),
namespace_id,
table_id,
columns: columns.clone(),
predicate,
expected_schema: Arc::clone(&expected_schema),
};
let backoff_config = self.backoff_config.clone();
// wrap `execute` into an additional future so that we can measure the request time
// INFO: create the measurement structure outside of the async block so cancellation is
// always measured
let measure_me = ObserveIngesterRequest::new(request.clone(), metrics, &span_recorder);
async move {
let span_recorder = measure_me
.span_recorder()
.child("ingester request (retry block)");
let res = Backoff::new(&backoff_config)
.retry_all_errors("ingester request", move || {
let request = request.clone();
let span_recorder = span_recorder.child("ingester request (single try)");
async move { execute(request, &span_recorder).await }
})
.await;
match &res {
Ok(partitions) => {
let mut status = IngesterResponseOk::default();
for p in partitions {
status.n_partitions += 1;
for c in p.chunks() {
status.n_chunks += 1;
status.n_rows += c.rows();
}
}
measure_me.set_ok(status);
}
Err(_) => measure_me.set_err(),
}
res
}
};
let mut ingester_partitions: Vec<IngesterPartition> = self
.ingester_addresses
.iter()
.cloned()
.map(move |ingester_address| measured_ingester_request(ingester_address))
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await
.map_err(|e| {
span_recorder.error("failed");
match e {
BackoffError::DeadlineExceeded { source, .. } => source,
}
})?
// We have a Vec<Vec<..>> flatten to Vec<_>
.into_iter()
.flatten()
.collect();
ingester_partitions.sort_by_key(|p| p.partition_id);
span_recorder.ok("done");
Ok(ingester_partitions)
}
async fn get_write_info(&self, write_token: &str) -> Result<GetWriteInfoResponse> {
let responses = self
.ingester_addresses
.iter()
.map(|ingester_address| execute_get_write_infos(ingester_address, write_token))
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await?;
Ok(merge_responses(responses))
}
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
}
/// Struct that names all parameters to `execute`
#[derive(Debug, Clone)]
struct GetPartitionForIngester<'a> {
@ -726,7 +916,7 @@ impl IngesterConnection for IngesterConnectionImpl {
/// Retrieve chunks from the ingester for the particular table, shard, and predicate
async fn partitions(
&self,
shard_indexes: &[ShardIndex],
shard_indexes: Option<Vec<ShardIndex>>,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
@ -736,6 +926,7 @@ impl IngesterConnection for IngesterConnectionImpl {
) -> Result<Vec<IngesterPartition>> {
// If no shard indexes are specified, no ingester addresses can be found. This is a
// configuration problem somewhere.
let shard_indexes = shard_indexes.expect("A `shard_indexes` list must be specified");
assert!(
!shard_indexes.is_empty(),
"Called `IngesterConnection.partitions` with an empty `shard_indexes` list",
@ -802,7 +993,7 @@ impl IngesterConnection for IngesterConnectionImpl {
// shard_indexes relevant to this query.
let mut relevant_ingester_addresses = HashSet::new();
for shard_index in shard_indexes {
for shard_index in &shard_indexes {
match self.shard_to_ingesters.get(shard_index) {
None => {
return NoIngesterFoundForShardSnafu {

View File

@ -34,7 +34,7 @@ impl MockIngesterConnection {
impl IngesterConnection for MockIngesterConnection {
async fn partitions(
&self,
_shard_indexes: &[ShardIndex],
_shard_indexes: Option<Vec<ShardIndex>>,
_namespace_id: NamespaceId,
_table_id: TableId,
columns: Vec<String>,

View File

@ -25,10 +25,13 @@ mod table;
mod tombstone;
pub use cache::CatalogCache as QuerierCatalogCache;
pub use database::{Error as QuerierDatabaseError, QuerierDatabase};
pub use database::{
Database, Error as QuerierDatabaseError, QuerierDatabase, QuerierRpcWriteDatabase,
};
pub use handler::{QuerierHandler, QuerierHandlerImpl};
pub use ingester::{
create_ingester_connection_for_testing, create_ingester_connections_by_shard,
create_ingester2_connection, create_ingester_connection_for_testing,
create_ingester_connections_by_shard,
flight_client::{
Error as IngesterFlightClientError, FlightClient as IngesterFlightClient,
QueryData as IngesterFlightClientQueryData,

View File

@ -58,7 +58,7 @@ impl QuerierNamespace {
exec: Arc<Executor>,
ingester_connection: Option<Arc<dyn IngesterConnection>>,
query_log: Arc<QueryLog>,
sharder: Arc<JumpHash<Arc<ShardIndex>>>,
sharder: Option<Arc<JumpHash<Arc<ShardIndex>>>>,
prune_metrics: Arc<PruneMetrics>,
) -> Self {
let tables: HashMap<_, _> = ns
@ -66,7 +66,7 @@ impl QuerierNamespace {
.iter()
.map(|(table_name, cached_table)| {
let table = Arc::new(QuerierTable::new(QuerierTableArgs {
sharder: Arc::clone(&sharder),
sharder: sharder.clone(),
namespace_id: ns.id,
namespace_name: Arc::clone(&name),
namespace_retention_period: ns.retention_period,
@ -118,7 +118,7 @@ impl QuerierNamespace {
exec,
ingester_connection,
query_log,
sharder,
Some(sharder),
prune_metrics,
)
}

View File

@ -78,7 +78,7 @@ impl From<Error> for DataFusionError {
/// Args to create a [`QuerierTable`].
pub struct QuerierTableArgs {
pub sharder: Arc<JumpHash<Arc<ShardIndex>>>,
pub sharder: Option<Arc<JumpHash<Arc<ShardIndex>>>>,
pub namespace_id: NamespaceId,
pub namespace_name: Arc<str>,
pub namespace_retention_period: Option<Duration>,
@ -94,8 +94,9 @@ pub struct QuerierTableArgs {
/// Table representation for the querier.
#[derive(Debug)]
pub struct QuerierTable {
/// Sharder to query for which shards are responsible for the table's data
sharder: Arc<JumpHash<Arc<ShardIndex>>>,
/// Sharder to query for which shards are responsible for the table's data. If not specified,
/// query all ingesters.
sharder: Option<Arc<JumpHash<Arc<ShardIndex>>>>,
/// Namespace the table is in
namespace_name: Arc<str>,
@ -474,14 +475,15 @@ impl QuerierTable {
// determine which ingester(s) to query.
// Currently, the sharder will only return one shard index per table, but in the
// near future, the sharder might return more than one shard index for one table.
let shard_indexes = vec![**self
let shard_indexes = self
.sharder
.shard_for_query(&self.table_name, &self.namespace_name)];
.as_ref()
.map(|sharder| vec![**sharder.shard_for_query(&self.table_name, &self.namespace_name)]);
// get any chunks from the ingester(s)
let partitions_result = ingester_connection
.partitions(
&shard_indexes,
shard_indexes,
self.namespace_id,
self.table_id,
columns,