From e9cdaffe74dc9b28439a2bd13a3b136bfa56d74c Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 13 Jun 2022 12:15:10 -0400 Subject: [PATCH] fix: Create querier sharder from catalog sequencer info Panic if there are no sharders in the catalog. --- Cargo.lock | 1 + clap_blocks/src/write_buffer.rs | 92 ------------------ influxdb_iox/src/commands/run/all_in_one.rs | 2 - influxdb_iox/src/commands/run/querier.rs | 6 +- iox_tests/Cargo.toml | 1 + iox_tests/src/util.rs | 21 +++- ioxd_querier/src/lib.rs | 64 +++---------- ioxd_querier/src/rpc/namespace.rs | 51 ++++++---- querier/src/database.rs | 101 ++++++++++++++++---- querier/src/handler.rs | 52 ++++++---- querier/src/lib.rs | 2 +- 11 files changed, 188 insertions(+), 205 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b44c3734e..27b12ccb60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2422,6 +2422,7 @@ dependencies = [ "observability_deps", "parquet_file", "schema", + "sharder", "uuid 0.8.2", "workspace-hack", ] diff --git a/clap_blocks/src/write_buffer.rs b/clap_blocks/src/write_buffer.rs index 5d9e70ad1a..b050c7a081 100644 --- a/clap_blocks/src/write_buffer.rs +++ b/clap_blocks/src/write_buffer.rs @@ -63,57 +63,6 @@ pub struct WriteBufferConfig { pub(crate) auto_create_topics: Option, } -/// For use by the querier. If these options are specified, the querier can use the same sharding -/// that the router uses to know the subset of ingesters to query. If these options are not -/// specified, the querier will ask all ingesters. -#[derive(Debug, clap::Parser)] -pub struct OptionalWriteBufferConfig { - /// The type of write buffer to use. - /// - /// Valid options are: file, kafka - #[clap(long = "--write-buffer", env = "INFLUXDB_IOX_WRITE_BUFFER_TYPE")] - pub(crate) type_: Option, - - /// The address to the write buffer. - #[clap(long = "--write-buffer-addr", env = "INFLUXDB_IOX_WRITE_BUFFER_ADDR")] - pub(crate) connection_string: Option, - - /// Write buffer topic/database that should be used. - #[clap( - long = "--write-buffer-topic", - env = "INFLUXDB_IOX_WRITE_BUFFER_TOPIC", - default_value = "iox-shared" - )] - pub(crate) topic: String, - - /// Write buffer connection config. - /// - /// The concrete options depend on the write buffer type. - /// - /// Command line arguments are passed as - /// `--write-buffer-connection-config key1=value1 key2=value2` or - /// `--write-buffer-connection-config key1=value1,key2=value2`. - /// - /// Environment variables are passed as `key1=value1,key2=value2,...`. - #[clap( - long = "--write-buffer-connection-config", - env = "INFLUXDB_IOX_WRITE_BUFFER_CONNECTION_CONFIG", - default_value = "", - multiple_values = true, - use_value_delimiter = true, - action - )] - pub(crate) connection_config: Vec, - - /// The number of topics to create automatically, if any. Default is to not create any topics. - #[clap( - long = "--write-buffer-auto-create-topics", - env = "INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS", - action - )] - pub(crate) auto_create_topics: Option, -} - impl WriteBufferConfig { /// Create a new instance for all-in-one mode, only allowing some arguments. /// If `database_directory` is not specified, creates a new temporary directory. @@ -218,47 +167,6 @@ impl WriteBufferConfig { } } -impl From for OptionalWriteBufferConfig { - fn from(write_buffer_config: WriteBufferConfig) -> Self { - Self { - type_: Some(write_buffer_config.type_.clone()), - connection_string: Some(write_buffer_config.connection_string.clone()), - topic: write_buffer_config.topic.clone(), - connection_config: write_buffer_config.connection_config.clone(), - auto_create_topics: write_buffer_config.auto_create_topics, - } - } -} - -impl OptionalWriteBufferConfig { - /// Initialize a [`WriteBufferWriting`] if the options are specified - pub async fn writing( - &self, - metrics: Arc, - trace_collector: Option>, - ) -> Result>, WriteBufferError> { - match (self.type_.as_ref(), self.connection_string.as_ref()) { - (Some(type_), Some(connection_string)) => { - let write_buffer_config = WriteBufferConfig { - type_: type_.to_string(), - connection_string: connection_string.to_string(), - topic: self.topic.clone(), - connection_config: self.connection_config.clone(), - auto_create_topics: self.auto_create_topics, - }; - let conn = write_buffer_config.conn(); - let factory = WriteBufferConfig::factory(metrics); - Ok(Some( - factory - .new_config_write(&self.topic, trace_collector.as_ref(), &conn) - .await?, - )) - } - _ => Ok(None), - } - } -} - #[cfg(test)] mod tests { use clap::StructOpt; diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index e3651a0055..9fa3e623b0 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -516,14 +516,12 @@ pub async fn command(config: Config) -> Result<()> { let ingester_addresses = vec![format!("http://{}", ingester_run_config.grpc_bind_address)]; info!(?ingester_addresses, "starting querier"); - let optional_write_buffer_config = write_buffer_config.into(); let querier = create_querier_server_type(QuerierServerTypeArgs { common_state: &common_state, metric_registry: Arc::clone(&metrics), catalog, object_store, exec, - optional_write_buffer_config: &optional_write_buffer_config, time_provider, ingester_addresses, querier_config, diff --git a/influxdb_iox/src/commands/run/querier.rs b/influxdb_iox/src/commands/run/querier.rs index 0da799d935..cfa478bf99 100644 --- a/influxdb_iox/src/commands/run/querier.rs +++ b/influxdb_iox/src/commands/run/querier.rs @@ -3,7 +3,7 @@ use super::main; use clap_blocks::{ catalog_dsn::CatalogDsnConfig, object_store::make_object_store, querier::QuerierConfig, - run_config::RunConfig, write_buffer::OptionalWriteBufferConfig, + run_config::RunConfig, }; use iox_query::exec::Executor; use iox_time::{SystemProvider, TimeProvider}; @@ -66,9 +66,6 @@ pub struct Config { #[clap(flatten)] pub(crate) querier_config: QuerierConfig, - - #[clap(flatten)] - pub(crate) optional_write_buffer_config: OptionalWriteBufferConfig, } pub async fn command(config: Config) -> Result<(), Error> { @@ -108,7 +105,6 @@ pub async fn command(config: Config) -> Result<(), Error> { catalog, object_store, exec, - optional_write_buffer_config: &config.optional_write_buffer_config, time_provider, ingester_addresses, querier_config: config.querier_config, diff --git a/iox_tests/Cargo.toml b/iox_tests/Cargo.toml index 0cb0e981a1..246e8b34d1 100644 --- a/iox_tests/Cargo.toml +++ b/iox_tests/Cargo.toml @@ -19,6 +19,7 @@ observability_deps = { path = "../observability_deps" } parquet_file = { path = "../parquet_file" } iox_query = { path = "../iox_query" } schema = { path = "../schema" } +sharder = { path = "../sharder" } uuid = { version = "0.8", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} futures = "0.3.21" diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index c2748cb653..dcbfa1fbac 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -91,7 +91,26 @@ impl TestCatalog { Arc::clone(&self.exec) } - /// Create a namesapce in teh catalog + /// Create a sequencer in the catalog + pub async fn create_sequencer(self: &Arc, sequencer: i32) -> Arc { + let mut repos = self.catalog.repositories().await; + + let kafka_topic = repos + .kafka_topics() + .create_or_get("kafka_topic") + .await + .unwrap(); + let kafka_partition = KafkaPartition::new(sequencer); + Arc::new( + repos + .sequencers() + .create_or_get(&kafka_topic, kafka_partition) + .await + .unwrap(), + ) + } + + /// Create a namesapce in the catalog pub async fn create_namespace(self: &Arc, name: &str) -> Arc { let mut repos = self.catalog.repositories().await; diff --git a/ioxd_querier/src/lib.rs b/ioxd_querier/src/lib.rs index 26ec0c51bc..c08e2f04ce 100644 --- a/ioxd_querier/src/lib.rs +++ b/ioxd_querier/src/lib.rs @@ -1,6 +1,5 @@ use async_trait::async_trait; -use clap_blocks::{querier::QuerierConfig, write_buffer::OptionalWriteBufferConfig}; -use data_types::KafkaPartition; +use clap_blocks::querier::QuerierConfig; use hyper::{Body, Request, Response}; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; @@ -20,9 +19,7 @@ use querier::{ create_ingester_connection, QuerierCatalogCache, QuerierDatabase, QuerierHandler, QuerierHandlerImpl, QuerierServer, }; -use sharder::JumpHash; use std::{ - collections::BTreeSet, fmt::{Debug, Display}, sync::Arc, }; @@ -147,7 +144,6 @@ pub struct QuerierServerTypeArgs<'a> { pub catalog: Arc, pub object_store: Arc, pub exec: Arc, - pub optional_write_buffer_config: &'a OptionalWriteBufferConfig, pub time_provider: Arc, pub ingester_addresses: Vec, pub querier_config: QuerierConfig, @@ -160,6 +156,9 @@ pub enum Error { #[error("failed to create KafkaPartition from id: {0}")] InvalidData(#[from] std::num::TryFromIntError), + + #[error("querier error: {0}")] + Querier(#[from] querier::QuerierDatabaseError), } /// Instantiate a querier server @@ -176,22 +175,17 @@ pub async fn create_querier_server_type( let ingester_connection = create_ingester_connection(args.ingester_addresses, Arc::clone(&catalog_cache)); - let sharder = maybe_sharder( - args.optional_write_buffer_config, - Arc::clone(&args.metric_registry), - args.common_state.trace_collector(), - ) - .await?; - - let database = Arc::new(QuerierDatabase::new( - catalog_cache, - Arc::clone(&args.metric_registry), - ParquetStorage::new(args.object_store), - args.exec, - ingester_connection, - args.querier_config.max_concurrent_queries(), - sharder, - )); + let database = Arc::new( + QuerierDatabase::new( + catalog_cache, + Arc::clone(&args.metric_registry), + ParquetStorage::new(args.object_store), + args.exec, + ingester_connection, + args.querier_config.max_concurrent_queries(), + ) + .await?, + ); let querier_handler = Arc::new(QuerierHandlerImpl::new(args.catalog, Arc::clone(&database))); let querier = QuerierServer::new(args.metric_registry, querier_handler); @@ -201,31 +195,3 @@ pub async fn create_querier_server_type( args.common_state, ))) } - -pub async fn maybe_sharder( - optional_write_buffer_config: &OptionalWriteBufferConfig, - metric_registry: Arc, - trace_collector: Option>, -) -> Result>>, Error> { - optional_write_buffer_config - .writing(metric_registry, trace_collector) - .await? - .map(|write_buffer| { - // Construct the (ordered) set of sequencers. - // - // The sort order must be deterministic in order for all nodes to shard to - // the same sequencers, therefore we type assert the returned set is of the - // ordered variety. - let shards: BTreeSet<_> = write_buffer.sequencer_ids(); - // ^ don't change this to an unordered set - - Ok(shards - .into_iter() - .map(|id| Ok(KafkaPartition::new(id.try_into()?))) - .collect::, Error>>()? - .into_iter() - .map(Arc::new) - .collect::>()) - }) - .transpose() -} diff --git a/ioxd_querier/src/rpc/namespace.rs b/ioxd_querier/src/rpc/namespace.rs index 19ad8d1b52..b2172e63bd 100644 --- a/ioxd_querier/src/rpc/namespace.rs +++ b/ioxd_querier/src/rpc/namespace.rs @@ -53,32 +53,37 @@ impl proto::namespace_service_server::NamespaceService for NamespaceServiceImpl #[cfg(test)] mod tests { + use super::*; use generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService; use iox_tests::util::TestCatalog; use parquet_file::storage::ParquetStorage; use querier::{create_ingester_connection_for_testing, QuerierCatalogCache}; - use super::*; - #[tokio::test] async fn test_get_namespaces_empty() { let catalog = TestCatalog::new(); + // QuerierDatabase::new panics if there are no sequencers in the catalog + catalog.create_sequencer(0).await; + let catalog_cache = Arc::new(QuerierCatalogCache::new( catalog.catalog(), catalog.time_provider(), catalog.metric_registry(), usize::MAX, )); - let db = Arc::new(QuerierDatabase::new( - catalog_cache, - catalog.metric_registry(), - ParquetStorage::new(catalog.object_store()), - catalog.exec(), - create_ingester_connection_for_testing(), - QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, - None, - )); + let db = Arc::new( + QuerierDatabase::new( + catalog_cache, + catalog.metric_registry(), + ParquetStorage::new(catalog.object_store()), + catalog.exec(), + create_ingester_connection_for_testing(), + QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + ) + .await + .unwrap(), + ); let service = NamespaceServiceImpl::new(db); @@ -93,21 +98,27 @@ mod tests { async fn test_get_namespaces() { let catalog = TestCatalog::new(); + // QuerierDatabase::new panics if there are no sequencers in the catalog + catalog.create_sequencer(0).await; + let catalog_cache = Arc::new(QuerierCatalogCache::new( catalog.catalog(), catalog.time_provider(), catalog.metric_registry(), usize::MAX, )); - let db = Arc::new(QuerierDatabase::new( - catalog_cache, - catalog.metric_registry(), - ParquetStorage::new(catalog.object_store()), - catalog.exec(), - create_ingester_connection_for_testing(), - QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, - None, - )); + let db = Arc::new( + QuerierDatabase::new( + catalog_cache, + catalog.metric_registry(), + ParquetStorage::new(catalog.object_store()), + catalog.exec(), + create_ingester_connection_for_testing(), + QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + ) + .await + .unwrap(), + ); let service = NamespaceServiceImpl::new(db); catalog.create_namespace("namespace2").await; diff --git a/querier/src/database.rs b/querier/src/database.rs index 401df5d2e0..cacfcf2dc0 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -7,11 +7,13 @@ use crate::{ use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; use data_types::{KafkaPartition, Namespace}; +use iox_catalog::interface::Catalog; use iox_query::exec::Executor; use parquet_file::storage::ParquetStorage; use service_common::QueryDatabaseProvider; use sharder::JumpHash; -use std::sync::Arc; +use snafu::{ResultExt, Snafu}; +use std::{collections::BTreeSet, sync::Arc}; use tracker::{ AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore, }; @@ -21,6 +23,15 @@ use tracker::{ /// That buffer is shared between all namespaces, and filtered on query const QUERY_LOG_SIZE: usize = 10_000; +#[allow(missing_docs)] +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Catalog error: {source}"))] + Catalog { + source: iox_catalog::interface::Error, + }, +} + /// Database for the querier. /// /// Contains all namespaces. @@ -55,9 +66,8 @@ pub struct QuerierDatabase { /// If the same database is requested twice for different queries, it is counted twice. query_execution_semaphore: Arc, - /// Optional sharder to determine which ingesters to query for a particular table and - /// namespace. If not specified, all ingesters will be queried. - _sharder: Option>>, + /// Sharder to determine which ingesters to query for a particular table and namespace. + _sharder: JumpHash>, } #[async_trait] @@ -85,15 +95,14 @@ impl QuerierDatabase { pub const MAX_CONCURRENT_QUERIES_MAX: usize = u16::MAX as usize; /// Create new database. - pub fn new( + pub async fn new( catalog_cache: Arc, metric_registry: Arc, store: ParquetStorage, exec: Arc, ingester_connection: Arc, max_concurrent_queries: usize, - sharder: Option>>, - ) -> Self { + ) -> Result { assert!( max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX, "`max_concurrent_queries` ({}) > `max_concurrent_queries_MAX` ({})", @@ -115,7 +124,9 @@ impl QuerierDatabase { let query_execution_semaphore = Arc::new(semaphore_metrics.new_semaphore(max_concurrent_queries)); - Self { + let _sharder = create_sharder(catalog_cache.catalog().as_ref()).await?; + + Ok(Self { backoff_config: BackoffConfig::default(), catalog_cache, chunk_adapter, @@ -124,8 +135,8 @@ impl QuerierDatabase { ingester_connection, query_log, query_execution_semaphore, - _sharder: sharder, - } + _sharder, + }) } /// Get namespace if it exists. @@ -171,6 +182,29 @@ impl QuerierDatabase { } } +pub async fn create_sharder(catalog: &dyn Catalog) -> Result>, Error> { + let sequencers = catalog + .repositories() + .await + .sequencers() + .list() + .await + .context(CatalogSnafu)?; + + // Construct the (ordered) set of sequencers. + // + // The sort order must be deterministic in order for all nodes to shard to + // the same sequencers, therefore we type assert the returned set is of the + // ordered variety. + let shards: BTreeSet<_> = sequencers + // ^ don't change this to an unordered set + .into_iter() + .map(|sequencer| sequencer.kafka_partition) + .collect(); + + Ok(shards.into_iter().map(Arc::new).collect()) +} + #[cfg(test)] mod tests { use iox_tests::util::TestCatalog; @@ -179,11 +213,11 @@ mod tests { use super::*; - #[test] + #[tokio::test] #[should_panic( expected = "`max_concurrent_queries` (65536) > `max_concurrent_queries_MAX` (65535)" )] - fn test_semaphore_limit_is_checked() { + async fn test_semaphore_limit_is_checked() { let catalog = TestCatalog::new(); let catalog_cache = Arc::new(CatalogCache::new( @@ -199,13 +233,40 @@ mod tests { catalog.exec(), create_ingester_connection_for_testing(), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX.saturating_add(1), - None, - ); + ) + .await + .unwrap(); + } + + #[tokio::test] + #[should_panic(expected = "cannot initialise sharder with no shards")] + async fn sequencers_in_catalog_are_required_for_startup() { + let catalog = TestCatalog::new(); + + let catalog_cache = Arc::new(CatalogCache::new( + catalog.catalog(), + catalog.time_provider(), + catalog.metric_registry(), + usize::MAX, + )); + + QuerierDatabase::new( + catalog_cache, + catalog.metric_registry(), + ParquetStorage::new(catalog.object_store()), + catalog.exec(), + create_ingester_connection_for_testing(), + QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + ) + .await + .unwrap(); } #[tokio::test] async fn test_namespace() { let catalog = TestCatalog::new(); + // QuerierDatabase::new panics if there are no sequencers in the catalog + catalog.create_sequencer(0).await; let catalog_cache = Arc::new(CatalogCache::new( catalog.catalog(), @@ -220,8 +281,9 @@ mod tests { catalog.exec(), create_ingester_connection_for_testing(), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, - None, - ); + ) + .await + .unwrap(); catalog.create_namespace("ns1").await; @@ -232,6 +294,8 @@ mod tests { #[tokio::test] async fn test_namespaces() { let catalog = TestCatalog::new(); + // QuerierDatabase::new panics if there are no sequencers in the catalog + catalog.create_sequencer(0).await; let catalog_cache = Arc::new(CatalogCache::new( catalog.catalog(), @@ -246,8 +310,9 @@ mod tests { catalog.exec(), create_ingester_connection_for_testing(), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, - None, - ); + ) + .await + .unwrap(); catalog.create_namespace("ns1").await; catalog.create_namespace("ns2").await; diff --git a/querier/src/handler.rs b/querier/src/handler.rs index 2f75b3482f..7e1dc93a5c 100644 --- a/querier/src/handler.rs +++ b/querier/src/handler.rs @@ -128,21 +128,19 @@ impl Drop for QuerierHandlerImpl { #[cfg(test)] mod tests { - use std::time::Duration; - + use super::*; + use crate::{cache::CatalogCache, create_ingester_connection_for_testing}; + use data_types::KafkaPartition; use iox_catalog::mem::MemCatalog; use iox_query::exec::Executor; use iox_time::{MockProvider, Time}; use object_store::memory::InMemory; use parquet_file::storage::ParquetStorage; - - use crate::{cache::CatalogCache, create_ingester_connection_for_testing}; - - use super::*; + use std::time::Duration; #[tokio::test] async fn test_shutdown() { - let querier = TestQuerier::new().querier; + let querier = TestQuerier::new().await.querier; // does not exit w/o shutdown tokio::select! { @@ -162,7 +160,7 @@ mod tests { } impl TestQuerier { - fn new() -> Self { + async fn new() -> Self { let metric_registry = Arc::new(metric::Registry::new()); let catalog = Arc::new(MemCatalog::new(Arc::clone(&metric_registry))) as _; let object_store = Arc::new(InMemory::new()); @@ -174,15 +172,35 @@ mod tests { Arc::clone(&metric_registry), usize::MAX, )); - let database = Arc::new(QuerierDatabase::new( - catalog_cache, - metric_registry, - ParquetStorage::new(object_store), - exec, - create_ingester_connection_for_testing(), - QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, - None, - )); + // QuerierDatabase::new panics if there are no sequencers in the catalog + { + let mut repos = catalog.repositories().await; + + let kafka_topic = repos + .kafka_topics() + .create_or_get("kafka_topic") + .await + .unwrap(); + let kafka_partition = KafkaPartition::new(0); + repos + .sequencers() + .create_or_get(&kafka_topic, kafka_partition) + .await + .unwrap(); + } + + let database = Arc::new( + QuerierDatabase::new( + catalog_cache, + metric_registry, + ParquetStorage::new(object_store), + exec, + create_ingester_connection_for_testing(), + QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + ) + .await + .unwrap(), + ); let querier = QuerierHandlerImpl::new(catalog, database); Self { querier } diff --git a/querier/src/lib.rs b/querier/src/lib.rs index 8abb6390d4..d61730dbd7 100644 --- a/querier/src/lib.rs +++ b/querier/src/lib.rs @@ -23,7 +23,7 @@ mod table; mod tombstone; pub use cache::CatalogCache as QuerierCatalogCache; -pub use database::QuerierDatabase; +pub use database::{Error as QuerierDatabaseError, QuerierDatabase}; pub use handler::{QuerierHandler, QuerierHandlerImpl}; pub use ingester::{ create_ingester_connection, create_ingester_connection_for_testing,