refactor: Make the querier server constructor more like other server constructors

pull/24376/head
Carol (Nichols || Goulding) 2022-06-09 16:05:52 -04:00
parent 6417e7dc2a
commit 148bc57e7b
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
6 changed files with 71 additions and 51 deletions

2
Cargo.lock generated
View File

@ -2526,6 +2526,7 @@ version = "0.1.0"
dependencies = [
"arrow-flight",
"async-trait",
"clap_blocks",
"data_types",
"generated_types",
"hyper",
@ -2540,6 +2541,7 @@ dependencies = [
"querier",
"service_grpc_flight",
"service_grpc_influxrpc",
"thiserror",
"tokio",
"tonic",
"trace",

View File

@ -1,6 +1,5 @@
use std::collections::BTreeSet;
use snafu::Snafu;
use std::collections::BTreeSet;
#[derive(Debug, Snafu)]
pub enum Error {

View File

@ -1,11 +1,14 @@
//! Implementation of command line option for running all in one mode
use super::main;
use clap_blocks::object_store::make_object_store;
use clap_blocks::querier::QuerierConfig;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, compactor::CompactorConfig, ingester::IngesterConfig,
object_store::ObjectStoreConfig, run_config::RunConfig, socket_addr::SocketAddr,
catalog_dsn::CatalogDsnConfig,
compactor::CompactorConfig,
ingester::IngesterConfig,
object_store::{make_object_store, ObjectStoreConfig},
querier::QuerierConfig,
run_config::RunConfig,
socket_addr::SocketAddr,
write_buffer::WriteBufferConfig,
};
use iox_query::exec::Executor;
@ -66,6 +69,9 @@ pub enum Error {
#[error("error initializing compactor: {0}")]
Compactor(#[from] ioxd_compactor::Error),
#[error("Querier error: {0}")]
Querier(#[from] ioxd_querier::Error),
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),
}
@ -515,13 +521,13 @@ pub async fn command(config: Config) -> Result<()> {
metric_registry: Arc::clone(&metrics),
catalog,
object_store,
time_provider,
exec,
write_buffer_config: &write_buffer_config,
time_provider,
ingester_addresses,
ram_pool_bytes: querier_config.ram_pool_bytes(),
max_concurrent_queries: querier_config.max_concurrent_queries(),
querier_config,
})
.await;
.await?;
info!("starting all in one server");

View File

@ -1,22 +1,23 @@
//! Implementation of command line option for running the querier
use clap_blocks::querier::QuerierConfig;
use super::main;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, object_store::make_object_store, querier::QuerierConfig,
run_config::RunConfig, write_buffer::WriteBufferConfig,
};
use iox_query::exec::Executor;
use iox_time::{SystemProvider, TimeProvider};
use ioxd_common::{
server_type::{CommonServerState, CommonServerStateError},
Service,
};
use ioxd_querier::{create_querier_server_type, QuerierServerTypeArgs};
use object_store::DynObjectStore;
use object_store_metrics::ObjectStoreMetrics;
use observability_deps::tracing::*;
use std::sync::Arc;
use thiserror::Error;
use clap_blocks::object_store::make_object_store;
use clap_blocks::{catalog_dsn::CatalogDsnConfig, run_config::RunConfig};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_querier::{create_querier_server_type, QuerierServerTypeArgs};
use super::main;
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
@ -36,6 +37,9 @@ pub enum Error {
#[error("Cannot parse object store config: {0}")]
ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError),
#[error("Querier error: {0}")]
Querier(#[from] ioxd_querier::Error),
}
#[derive(Debug, clap::Parser)]
@ -87,27 +91,28 @@ pub async fn command(config: Config) -> Result<(), Error> {
let time_provider = Arc::new(SystemProvider::new());
let num_query_threads = config.querier_config.num_query_threads();
let ingester_addresses = config.querier_config.ingester_addresses()?;
let num_threads = num_query_threads.unwrap_or_else(num_cpus::get);
info!(%num_threads, "using specified number of threads per thread pool");
let ingester_addresses = config.querier_config.ingester_addresses()?;
info!(?ingester_addresses, "using ingester addresses");
// Sharding needs to know about the write buffer sequencer_ids. For now, this is fake.
let write_buffer_config = WriteBufferConfig::new("iox-shared", None);
let exec = Arc::new(Executor::new(num_threads));
let ram_pool_bytes = config.querier_config.ram_pool_bytes();
let max_concurrent_queries = config.querier_config.max_concurrent_queries();
let server_type = create_querier_server_type(QuerierServerTypeArgs {
common_state: &common_state,
metric_registry: Arc::clone(&metric_registry),
catalog,
object_store,
time_provider,
exec,
write_buffer_config: &write_buffer_config,
time_provider,
ingester_addresses,
ram_pool_bytes,
max_concurrent_queries,
querier_config: config.querier_config,
})
.await;
.await?;
info!("starting querier");

View File

@ -3,10 +3,9 @@ name = "ioxd_querier"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Workspace dependencies, in alphabetical order
clap_blocks = { path = "../clap_blocks" }
data_types = { path = "../data_types" }
generated_types = { path = "../generated_types" }
iox_catalog = { path = "../iox_catalog" }
@ -24,12 +23,12 @@ trace = { path = "../trace" }
arrow-flight = "16.0.0"
async-trait = "0.1"
hyper = "0.14"
thiserror = "1.0.31"
tokio = { version = "1.19", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tonic = "0.7"
workspace-hack = { path = "../workspace-hack"}
parquet_file = { version = "0.1.0", path = "../parquet_file" }
[dev-dependencies]
# Workspace dependencies, in alphabetical order
iox_tests = { path = "../iox_tests" }

View File

@ -1,22 +1,9 @@
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use async_trait::async_trait;
use clap_blocks::{querier::QuerierConfig, write_buffer::WriteBufferConfig};
use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use iox_time::TimeProvider;
use metric::Registry;
use object_store::DynObjectStore;
use parquet_file::storage::ParquetStorage;
use querier::{
create_ingester_connection, QuerierCatalogCache, QuerierDatabase, QuerierHandler,
QuerierHandlerImpl, QuerierServer,
};
use trace::TraceCollector;
use ioxd_common::{
add_service,
http::error::{HttpApiError, HttpApiErrorCode, HttpApiErrorSource},
@ -25,6 +12,19 @@ use ioxd_common::{
server_type::{CommonServerState, RpcError, ServerType},
setup_builder,
};
use metric::Registry;
use object_store::DynObjectStore;
use parquet_file::storage::ParquetStorage;
use querier::{
create_ingester_connection, QuerierCatalogCache, QuerierDatabase, QuerierHandler,
QuerierHandlerImpl, QuerierServer,
};
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use thiserror::Error;
use trace::TraceCollector;
mod rpc;
@ -143,20 +143,25 @@ pub struct QuerierServerTypeArgs<'a> {
pub metric_registry: Arc<metric::Registry>,
pub catalog: Arc<dyn Catalog>,
pub object_store: Arc<DynObjectStore>,
pub time_provider: Arc<dyn TimeProvider>,
pub exec: Arc<Executor>,
pub write_buffer_config: &'a WriteBufferConfig,
pub time_provider: Arc<dyn TimeProvider>,
pub ingester_addresses: Vec<String>,
pub ram_pool_bytes: usize,
pub max_concurrent_queries: usize,
pub querier_config: QuerierConfig,
}
#[derive(Debug, Error)]
pub enum Error {}
/// Instantiate a querier server
pub async fn create_querier_server_type(args: QuerierServerTypeArgs<'_>) -> Arc<dyn ServerType> {
pub async fn create_querier_server_type(
args: QuerierServerTypeArgs<'_>,
) -> Result<Arc<dyn ServerType>, Error> {
let catalog_cache = Arc::new(QuerierCatalogCache::new(
Arc::clone(&args.catalog),
args.time_provider,
Arc::clone(&args.metric_registry),
args.ram_pool_bytes,
args.querier_config.ram_pool_bytes(),
));
let ingester_connection =
create_ingester_connection(args.ingester_addresses, Arc::clone(&catalog_cache));
@ -166,10 +171,14 @@ pub async fn create_querier_server_type(args: QuerierServerTypeArgs<'_>) -> Arc<
ParquetStorage::new(args.object_store),
args.exec,
ingester_connection,
args.max_concurrent_queries,
args.querier_config.max_concurrent_queries(),
));
let querier_handler = Arc::new(QuerierHandlerImpl::new(args.catalog, Arc::clone(&database)));
let querier = QuerierServer::new(args.metric_registry, querier_handler);
Arc::new(QuerierServerType::new(querier, database, args.common_state))
Ok(Arc::new(QuerierServerType::new(
querier,
database,
args.common_state,
)))
}