diff --git a/Cargo.lock b/Cargo.lock index f78b956be7..6b44c3734e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3960,6 +3960,7 @@ dependencies = [ "schema", "service_common", "service_grpc_schema", + "sharder", "snafu", "test_helpers", "thiserror", diff --git a/clap_blocks/src/write_buffer.rs b/clap_blocks/src/write_buffer.rs index 24932dce0a..5d9e70ad1a 100644 --- a/clap_blocks/src/write_buffer.rs +++ b/clap_blocks/src/write_buffer.rs @@ -41,7 +41,57 @@ pub struct WriteBufferConfig { /// /// The concrete options depend on the write buffer type. /// - /// Command line arguments are passed as `--write-buffer-connection-config key1=value1 key2=value2` or + /// Command line arguments are passed as + /// `--write-buffer-connection-config key1=value1 key2=value2` or + /// `--write-buffer-connection-config key1=value1,key2=value2`. + /// + /// Environment variables are passed as `key1=value1,key2=value2,...`. + #[clap( + long = "--write-buffer-connection-config", + env = "INFLUXDB_IOX_WRITE_BUFFER_CONNECTION_CONFIG", + default_value = "", + multiple_values = true, + use_value_delimiter = true + )] + pub(crate) connection_config: Vec, + + /// The number of topics to create automatically, if any. Default is to not create any topics. + #[clap( + long = "--write-buffer-auto-create-topics", + env = "INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS" + )] + pub(crate) auto_create_topics: Option, +} + +/// For use by the querier. If these options are specified, the querier can use the same sharding +/// that the router uses to know the subset of ingesters to query. If these options are not +/// specified, the querier will ask all ingesters. +#[derive(Debug, clap::Parser)] +pub struct OptionalWriteBufferConfig { + /// The type of write buffer to use. + /// + /// Valid options are: file, kafka + #[clap(long = "--write-buffer", env = "INFLUXDB_IOX_WRITE_BUFFER_TYPE")] + pub(crate) type_: Option, + + /// The address to the write buffer. + #[clap(long = "--write-buffer-addr", env = "INFLUXDB_IOX_WRITE_BUFFER_ADDR")] + pub(crate) connection_string: Option, + + /// Write buffer topic/database that should be used. + #[clap( + long = "--write-buffer-topic", + env = "INFLUXDB_IOX_WRITE_BUFFER_TOPIC", + default_value = "iox-shared" + )] + pub(crate) topic: String, + + /// Write buffer connection config. + /// + /// The concrete options depend on the write buffer type. + /// + /// Command line arguments are passed as + /// `--write-buffer-connection-config key1=value1 key2=value2` or /// `--write-buffer-connection-config key1=value1,key2=value2`. /// /// Environment variables are passed as `key1=value1,key2=value2,...`. @@ -168,6 +218,47 @@ impl WriteBufferConfig { } } +impl From for OptionalWriteBufferConfig { + fn from(write_buffer_config: WriteBufferConfig) -> Self { + Self { + type_: Some(write_buffer_config.type_.clone()), + connection_string: Some(write_buffer_config.connection_string.clone()), + topic: write_buffer_config.topic.clone(), + connection_config: write_buffer_config.connection_config.clone(), + auto_create_topics: write_buffer_config.auto_create_topics, + } + } +} + +impl OptionalWriteBufferConfig { + /// Initialize a [`WriteBufferWriting`] if the options are specified + pub async fn writing( + &self, + metrics: Arc, + trace_collector: Option>, + ) -> Result>, WriteBufferError> { + match (self.type_.as_ref(), self.connection_string.as_ref()) { + (Some(type_), Some(connection_string)) => { + let write_buffer_config = WriteBufferConfig { + type_: type_.to_string(), + connection_string: connection_string.to_string(), + topic: self.topic.clone(), + connection_config: self.connection_config.clone(), + auto_create_topics: self.auto_create_topics, + }; + let conn = write_buffer_config.conn(); + let factory = WriteBufferConfig::factory(metrics); + Ok(Some( + factory + .new_config_write(&self.topic, trace_collector.as_ref(), &conn) + .await?, + )) + } + _ => Ok(None), + } + } +} + #[cfg(test)] mod tests { use clap::StructOpt; diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 65f29d9394..e3651a0055 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -516,13 +516,14 @@ pub async fn command(config: Config) -> Result<()> { let ingester_addresses = vec![format!("http://{}", ingester_run_config.grpc_bind_address)]; info!(?ingester_addresses, "starting querier"); + let optional_write_buffer_config = write_buffer_config.into(); let querier = create_querier_server_type(QuerierServerTypeArgs { common_state: &common_state, metric_registry: Arc::clone(&metrics), catalog, object_store, exec, - write_buffer_config: &write_buffer_config, + optional_write_buffer_config: &optional_write_buffer_config, time_provider, ingester_addresses, querier_config, diff --git a/influxdb_iox/src/commands/run/querier.rs b/influxdb_iox/src/commands/run/querier.rs index ab9ad560ee..0da799d935 100644 --- a/influxdb_iox/src/commands/run/querier.rs +++ b/influxdb_iox/src/commands/run/querier.rs @@ -3,7 +3,7 @@ use super::main; use clap_blocks::{ catalog_dsn::CatalogDsnConfig, object_store::make_object_store, querier::QuerierConfig, - run_config::RunConfig, write_buffer::WriteBufferConfig, + run_config::RunConfig, write_buffer::OptionalWriteBufferConfig, }; use iox_query::exec::Executor; use iox_time::{SystemProvider, TimeProvider}; @@ -68,7 +68,7 @@ pub struct Config { pub(crate) querier_config: QuerierConfig, #[clap(flatten)] - pub(crate) write_buffer_config: WriteBufferConfig, + pub(crate) optional_write_buffer_config: OptionalWriteBufferConfig, } pub async fn command(config: Config) -> Result<(), Error> { @@ -100,9 +100,6 @@ pub async fn command(config: Config) -> Result<(), Error> { let ingester_addresses = config.querier_config.ingester_addresses()?; info!(?ingester_addresses, "using ingester addresses"); - // Sharding needs to know about the write buffer sequencer_ids. For now, this is fake. - let write_buffer_config = WriteBufferConfig::new("iox-shared", None); - let exec = Arc::new(Executor::new(num_threads)); let server_type = create_querier_server_type(QuerierServerTypeArgs { @@ -111,7 +108,7 @@ pub async fn command(config: Config) -> Result<(), Error> { catalog, object_store, exec, - write_buffer_config: &write_buffer_config, + optional_write_buffer_config: &config.optional_write_buffer_config, time_provider, ingester_addresses, querier_config: config.querier_config, diff --git a/influxdb_iox/tests/end_to_end_cases/logging.rs b/influxdb_iox/tests/end_to_end_cases/logging.rs index 5b29c29297..376c7c344d 100644 --- a/influxdb_iox/tests/end_to_end_cases/logging.rs +++ b/influxdb_iox/tests/end_to_end_cases/logging.rs @@ -4,17 +4,10 @@ use std::time::Duration; #[tokio::test] async fn test_logging() { - // Testing with querier mode because it has the least amount of setup needed. + // Testing with all-in-one mode because it has the least amount of setup needed. Command::cargo_bin("influxdb_iox") .unwrap() - .args(&[ - "run", - "querier", - "--log-filter", - "info", - "--catalog", - "memory", - ]) + .args(&["run", "all-in-one", "--log-filter", "info"]) .timeout(Duration::from_secs(2)) .assert() .failure() @@ -25,14 +18,7 @@ async fn test_logging() { Command::cargo_bin("influxdb_iox") .unwrap() - .args(&[ - "run", - "querier", - "--log-filter", - "error", - "--catalog", - "memory", - ]) + .args(&["run", "all-in-one", "--log-filter", "error"]) .timeout(Duration::from_secs(2)) .assert() .failure() diff --git a/ioxd_querier/src/lib.rs b/ioxd_querier/src/lib.rs index 4343dcd040..26ec0c51bc 100644 --- a/ioxd_querier/src/lib.rs +++ b/ioxd_querier/src/lib.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; -use clap_blocks::{querier::QuerierConfig, write_buffer::WriteBufferConfig}; +use clap_blocks::{querier::QuerierConfig, write_buffer::OptionalWriteBufferConfig}; +use data_types::KafkaPartition; use hyper::{Body, Request, Response}; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; @@ -19,7 +20,6 @@ use querier::{ create_ingester_connection, QuerierCatalogCache, QuerierDatabase, QuerierHandler, QuerierHandlerImpl, QuerierServer, }; -use router::sequencer::Sequencer; use sharder::JumpHash; use std::{ collections::BTreeSet, @@ -147,7 +147,7 @@ pub struct QuerierServerTypeArgs<'a> { pub catalog: Arc, pub object_store: Arc, pub exec: Arc, - pub write_buffer_config: &'a WriteBufferConfig, + pub optional_write_buffer_config: &'a OptionalWriteBufferConfig, pub time_provider: Arc, pub ingester_addresses: Vec, pub querier_config: QuerierConfig, @@ -157,6 +157,9 @@ pub struct QuerierServerTypeArgs<'a> { pub enum Error { #[error("failed to initialise write buffer connection: {0}")] WriteBuffer(#[from] write_buffer::core::WriteBufferError), + + #[error("failed to create KafkaPartition from id: {0}")] + InvalidData(#[from] std::num::TryFromIntError), } /// Instantiate a querier server @@ -173,27 +176,12 @@ pub async fn create_querier_server_type( let ingester_connection = create_ingester_connection(args.ingester_addresses, Arc::clone(&catalog_cache)); - let write_buffer = Arc::new( - args.write_buffer_config - .writing( - Arc::clone(&args.metric_registry), - args.common_state.trace_collector(), - ) - .await?, - ); - // Construct the (ordered) set of sequencers. - // - // The sort order must be deterministic in order for all nodes to shard to - // the same sequencers, therefore we type assert the returned set is of the - // ordered variety. - let shards: BTreeSet<_> = write_buffer.sequencer_ids(); - // ^ don't change this to an unordered set - - let _sharder: JumpHash<_> = shards - .into_iter() - .map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &args.metric_registry)) - .map(Arc::new) - .collect(); + let sharder = maybe_sharder( + args.optional_write_buffer_config, + Arc::clone(&args.metric_registry), + args.common_state.trace_collector(), + ) + .await?; let database = Arc::new(QuerierDatabase::new( catalog_cache, @@ -202,6 +190,7 @@ pub async fn create_querier_server_type( args.exec, ingester_connection, args.querier_config.max_concurrent_queries(), + sharder, )); let querier_handler = Arc::new(QuerierHandlerImpl::new(args.catalog, Arc::clone(&database))); @@ -212,3 +201,31 @@ pub async fn create_querier_server_type( args.common_state, ))) } + +pub async fn maybe_sharder( + optional_write_buffer_config: &OptionalWriteBufferConfig, + metric_registry: Arc, + trace_collector: Option>, +) -> Result>>, Error> { + optional_write_buffer_config + .writing(metric_registry, trace_collector) + .await? + .map(|write_buffer| { + // Construct the (ordered) set of sequencers. + // + // The sort order must be deterministic in order for all nodes to shard to + // the same sequencers, therefore we type assert the returned set is of the + // ordered variety. + let shards: BTreeSet<_> = write_buffer.sequencer_ids(); + // ^ don't change this to an unordered set + + Ok(shards + .into_iter() + .map(|id| Ok(KafkaPartition::new(id.try_into()?))) + .collect::, Error>>()? + .into_iter() + .map(Arc::new) + .collect::>()) + }) + .transpose() +} diff --git a/ioxd_querier/src/rpc/namespace.rs b/ioxd_querier/src/rpc/namespace.rs index 089418ce6d..19ad8d1b52 100644 --- a/ioxd_querier/src/rpc/namespace.rs +++ b/ioxd_querier/src/rpc/namespace.rs @@ -77,6 +77,7 @@ mod tests { catalog.exec(), create_ingester_connection_for_testing(), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + None, )); let service = NamespaceServiceImpl::new(db); @@ -105,6 +106,7 @@ mod tests { catalog.exec(), create_ingester_connection_for_testing(), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + None, )); let service = NamespaceServiceImpl::new(db); diff --git a/querier/Cargo.toml b/querier/Cargo.toml index 0143b88de9..460f335b34 100644 --- a/querier/Cargo.toml +++ b/querier/Cargo.toml @@ -29,6 +29,7 @@ read_buffer = { path = "../read_buffer" } service_common = { path = "../service_common" } service_grpc_schema = { path = "../service_grpc_schema" } schema = { path = "../schema" } +sharder = { path = "../sharder" } snafu = "0.7" thiserror = "1.0" iox_time = { path = "../iox_time" } diff --git a/querier/src/database.rs b/querier/src/database.rs index 98fef73b8f..401df5d2e0 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -6,10 +6,11 @@ use crate::{ }; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; -use data_types::Namespace; +use data_types::{KafkaPartition, Namespace}; use iox_query::exec::Executor; use parquet_file::storage::ParquetStorage; use service_common::QueryDatabaseProvider; +use sharder::JumpHash; use std::sync::Arc; use tracker::{ AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore, @@ -53,6 +54,10 @@ pub struct QuerierDatabase { /// /// If the same database is requested twice for different queries, it is counted twice. query_execution_semaphore: Arc, + + /// Optional sharder to determine which ingesters to query for a particular table and + /// namespace. If not specified, all ingesters will be queried. + _sharder: Option>>, } #[async_trait] @@ -74,8 +79,9 @@ impl QueryDatabaseProvider for QuerierDatabase { impl QuerierDatabase { /// The maximum value for `max_concurrent_queries` that is allowed. /// - /// This limit exists because [`tokio::sync::Semaphore`] has an internal limit and semaphore creation beyond that - /// will panic. The tokio limit is not exposed though so we pick a reasonable but smaller number. + /// This limit exists because [`tokio::sync::Semaphore`] has an internal limit and semaphore + /// creation beyond that will panic. The tokio limit is not exposed though so we pick a + /// reasonable but smaller number. pub const MAX_CONCURRENT_QUERIES_MAX: usize = u16::MAX as usize; /// Create new database. @@ -86,6 +92,7 @@ impl QuerierDatabase { exec: Arc, ingester_connection: Arc, max_concurrent_queries: usize, + sharder: Option>>, ) -> Self { assert!( max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX, @@ -117,13 +124,14 @@ impl QuerierDatabase { ingester_connection, query_log, query_execution_semaphore, + _sharder: sharder, } } /// Get namespace if it exists. /// - /// This will await the internal namespace semaphore. Existence of namespaces is checked AFTER a semaphore permit - /// was acquired since this lowers the chance that we obtain stale data. + /// This will await the internal namespace semaphore. Existence of namespaces is checked AFTER + /// a semaphore permit was acquired since this lowers the chance that we obtain stale data. pub async fn namespace(&self, name: &str) -> Option> { let name = Arc::from(name.to_owned()); let schema = self @@ -191,6 +199,7 @@ mod tests { catalog.exec(), create_ingester_connection_for_testing(), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX.saturating_add(1), + None, ); } @@ -211,6 +220,7 @@ mod tests { catalog.exec(), create_ingester_connection_for_testing(), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + None, ); catalog.create_namespace("ns1").await; @@ -236,6 +246,7 @@ mod tests { catalog.exec(), create_ingester_connection_for_testing(), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + None, ); catalog.create_namespace("ns1").await; diff --git a/querier/src/handler.rs b/querier/src/handler.rs index 12670d4e24..2f75b3482f 100644 --- a/querier/src/handler.rs +++ b/querier/src/handler.rs @@ -181,6 +181,7 @@ mod tests { exec, create_ingester_connection_for_testing(), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + None, )); let querier = QuerierHandlerImpl::new(catalog, database);