feat: Make specifying the write buffer, and thus getting a sharder, optional in querier

pull/24376/head
Carol (Nichols || Goulding) 2022-06-10 12:03:26 -04:00
parent 127467b5c4
commit 874ef89daa
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
10 changed files with 162 additions and 54 deletions

1
Cargo.lock generated
View File

@ -3960,6 +3960,7 @@ dependencies = [
"schema",
"service_common",
"service_grpc_schema",
"sharder",
"snafu",
"test_helpers",
"thiserror",

View File

@ -41,7 +41,57 @@ pub struct WriteBufferConfig {
///
/// The concrete options depend on the write buffer type.
///
/// Command line arguments are passed as `--write-buffer-connection-config key1=value1 key2=value2` or
/// Command line arguments are passed as
/// `--write-buffer-connection-config key1=value1 key2=value2` or
/// `--write-buffer-connection-config key1=value1,key2=value2`.
///
/// Environment variables are passed as `key1=value1,key2=value2,...`.
#[clap(
long = "--write-buffer-connection-config",
env = "INFLUXDB_IOX_WRITE_BUFFER_CONNECTION_CONFIG",
default_value = "",
multiple_values = true,
use_value_delimiter = true
)]
pub(crate) connection_config: Vec<String>,
/// The number of topics to create automatically, if any. Default is to not create any topics.
#[clap(
long = "--write-buffer-auto-create-topics",
env = "INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS"
)]
pub(crate) auto_create_topics: Option<NonZeroU32>,
}
/// For use by the querier. If these options are specified, the querier can use the same sharding
/// that the router uses to know the subset of ingesters to query. If these options are not
/// specified, the querier will ask all ingesters.
#[derive(Debug, clap::Parser)]
pub struct OptionalWriteBufferConfig {
/// The type of write buffer to use.
///
/// Valid options are: file, kafka
#[clap(long = "--write-buffer", env = "INFLUXDB_IOX_WRITE_BUFFER_TYPE")]
pub(crate) type_: Option<String>,
/// The address to the write buffer.
#[clap(long = "--write-buffer-addr", env = "INFLUXDB_IOX_WRITE_BUFFER_ADDR")]
pub(crate) connection_string: Option<String>,
/// Write buffer topic/database that should be used.
#[clap(
long = "--write-buffer-topic",
env = "INFLUXDB_IOX_WRITE_BUFFER_TOPIC",
default_value = "iox-shared"
)]
pub(crate) topic: String,
/// Write buffer connection config.
///
/// The concrete options depend on the write buffer type.
///
/// Command line arguments are passed as
/// `--write-buffer-connection-config key1=value1 key2=value2` or
/// `--write-buffer-connection-config key1=value1,key2=value2`.
///
/// Environment variables are passed as `key1=value1,key2=value2,...`.
@ -168,6 +218,47 @@ impl WriteBufferConfig {
}
}
impl From<WriteBufferConfig> for OptionalWriteBufferConfig {
fn from(write_buffer_config: WriteBufferConfig) -> Self {
Self {
type_: Some(write_buffer_config.type_.clone()),
connection_string: Some(write_buffer_config.connection_string.clone()),
topic: write_buffer_config.topic.clone(),
connection_config: write_buffer_config.connection_config.clone(),
auto_create_topics: write_buffer_config.auto_create_topics,
}
}
}
impl OptionalWriteBufferConfig {
/// Initialize a [`WriteBufferWriting`] if the options are specified
pub async fn writing(
&self,
metrics: Arc<metric::Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Option<Arc<dyn WriteBufferWriting>>, WriteBufferError> {
match (self.type_.as_ref(), self.connection_string.as_ref()) {
(Some(type_), Some(connection_string)) => {
let write_buffer_config = WriteBufferConfig {
type_: type_.to_string(),
connection_string: connection_string.to_string(),
topic: self.topic.clone(),
connection_config: self.connection_config.clone(),
auto_create_topics: self.auto_create_topics,
};
let conn = write_buffer_config.conn();
let factory = WriteBufferConfig::factory(metrics);
Ok(Some(
factory
.new_config_write(&self.topic, trace_collector.as_ref(), &conn)
.await?,
))
}
_ => Ok(None),
}
}
}
#[cfg(test)]
mod tests {
use clap::StructOpt;

View File

@ -516,13 +516,14 @@ pub async fn command(config: Config) -> Result<()> {
let ingester_addresses = vec![format!("http://{}", ingester_run_config.grpc_bind_address)];
info!(?ingester_addresses, "starting querier");
let optional_write_buffer_config = write_buffer_config.into();
let querier = create_querier_server_type(QuerierServerTypeArgs {
common_state: &common_state,
metric_registry: Arc::clone(&metrics),
catalog,
object_store,
exec,
write_buffer_config: &write_buffer_config,
optional_write_buffer_config: &optional_write_buffer_config,
time_provider,
ingester_addresses,
querier_config,

View File

@ -3,7 +3,7 @@
use super::main;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, object_store::make_object_store, querier::QuerierConfig,
run_config::RunConfig, write_buffer::WriteBufferConfig,
run_config::RunConfig, write_buffer::OptionalWriteBufferConfig,
};
use iox_query::exec::Executor;
use iox_time::{SystemProvider, TimeProvider};
@ -68,7 +68,7 @@ pub struct Config {
pub(crate) querier_config: QuerierConfig,
#[clap(flatten)]
pub(crate) write_buffer_config: WriteBufferConfig,
pub(crate) optional_write_buffer_config: OptionalWriteBufferConfig,
}
pub async fn command(config: Config) -> Result<(), Error> {
@ -100,9 +100,6 @@ pub async fn command(config: Config) -> Result<(), Error> {
let ingester_addresses = config.querier_config.ingester_addresses()?;
info!(?ingester_addresses, "using ingester addresses");
// Sharding needs to know about the write buffer sequencer_ids. For now, this is fake.
let write_buffer_config = WriteBufferConfig::new("iox-shared", None);
let exec = Arc::new(Executor::new(num_threads));
let server_type = create_querier_server_type(QuerierServerTypeArgs {
@ -111,7 +108,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
catalog,
object_store,
exec,
write_buffer_config: &write_buffer_config,
optional_write_buffer_config: &config.optional_write_buffer_config,
time_provider,
ingester_addresses,
querier_config: config.querier_config,

View File

@ -4,17 +4,10 @@ use std::time::Duration;
#[tokio::test]
async fn test_logging() {
// Testing with querier mode because it has the least amount of setup needed.
// Testing with all-in-one mode because it has the least amount of setup needed.
Command::cargo_bin("influxdb_iox")
.unwrap()
.args(&[
"run",
"querier",
"--log-filter",
"info",
"--catalog",
"memory",
])
.args(&["run", "all-in-one", "--log-filter", "info"])
.timeout(Duration::from_secs(2))
.assert()
.failure()
@ -25,14 +18,7 @@ async fn test_logging() {
Command::cargo_bin("influxdb_iox")
.unwrap()
.args(&[
"run",
"querier",
"--log-filter",
"error",
"--catalog",
"memory",
])
.args(&["run", "all-in-one", "--log-filter", "error"])
.timeout(Duration::from_secs(2))
.assert()
.failure()

View File

@ -1,5 +1,6 @@
use async_trait::async_trait;
use clap_blocks::{querier::QuerierConfig, write_buffer::WriteBufferConfig};
use clap_blocks::{querier::QuerierConfig, write_buffer::OptionalWriteBufferConfig};
use data_types::KafkaPartition;
use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
@ -19,7 +20,6 @@ use querier::{
create_ingester_connection, QuerierCatalogCache, QuerierDatabase, QuerierHandler,
QuerierHandlerImpl, QuerierServer,
};
use router::sequencer::Sequencer;
use sharder::JumpHash;
use std::{
collections::BTreeSet,
@ -147,7 +147,7 @@ pub struct QuerierServerTypeArgs<'a> {
pub catalog: Arc<dyn Catalog>,
pub object_store: Arc<DynObjectStore>,
pub exec: Arc<Executor>,
pub write_buffer_config: &'a WriteBufferConfig,
pub optional_write_buffer_config: &'a OptionalWriteBufferConfig,
pub time_provider: Arc<dyn TimeProvider>,
pub ingester_addresses: Vec<String>,
pub querier_config: QuerierConfig,
@ -157,6 +157,9 @@ pub struct QuerierServerTypeArgs<'a> {
pub enum Error {
#[error("failed to initialise write buffer connection: {0}")]
WriteBuffer(#[from] write_buffer::core::WriteBufferError),
#[error("failed to create KafkaPartition from id: {0}")]
InvalidData(#[from] std::num::TryFromIntError),
}
/// Instantiate a querier server
@ -173,27 +176,12 @@ pub async fn create_querier_server_type(
let ingester_connection =
create_ingester_connection(args.ingester_addresses, Arc::clone(&catalog_cache));
let write_buffer = Arc::new(
args.write_buffer_config
.writing(
Arc::clone(&args.metric_registry),
args.common_state.trace_collector(),
)
.await?,
);
// Construct the (ordered) set of sequencers.
//
// The sort order must be deterministic in order for all nodes to shard to
// the same sequencers, therefore we type assert the returned set is of the
// ordered variety.
let shards: BTreeSet<_> = write_buffer.sequencer_ids();
// ^ don't change this to an unordered set
let _sharder: JumpHash<_> = shards
.into_iter()
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &args.metric_registry))
.map(Arc::new)
.collect();
let sharder = maybe_sharder(
args.optional_write_buffer_config,
Arc::clone(&args.metric_registry),
args.common_state.trace_collector(),
)
.await?;
let database = Arc::new(QuerierDatabase::new(
catalog_cache,
@ -202,6 +190,7 @@ pub async fn create_querier_server_type(
args.exec,
ingester_connection,
args.querier_config.max_concurrent_queries(),
sharder,
));
let querier_handler = Arc::new(QuerierHandlerImpl::new(args.catalog, Arc::clone(&database)));
@ -212,3 +201,31 @@ pub async fn create_querier_server_type(
args.common_state,
)))
}
pub async fn maybe_sharder(
optional_write_buffer_config: &OptionalWriteBufferConfig,
metric_registry: Arc<metric::Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Option<JumpHash<Arc<KafkaPartition>>>, Error> {
optional_write_buffer_config
.writing(metric_registry, trace_collector)
.await?
.map(|write_buffer| {
// Construct the (ordered) set of sequencers.
//
// The sort order must be deterministic in order for all nodes to shard to
// the same sequencers, therefore we type assert the returned set is of the
// ordered variety.
let shards: BTreeSet<_> = write_buffer.sequencer_ids();
// ^ don't change this to an unordered set
Ok(shards
.into_iter()
.map(|id| Ok(KafkaPartition::new(id.try_into()?)))
.collect::<Result<Vec<KafkaPartition>, Error>>()?
.into_iter()
.map(Arc::new)
.collect::<JumpHash<_>>())
})
.transpose()
}

View File

@ -77,6 +77,7 @@ mod tests {
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
None,
));
let service = NamespaceServiceImpl::new(db);
@ -105,6 +106,7 @@ mod tests {
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
None,
));
let service = NamespaceServiceImpl::new(db);

View File

@ -29,6 +29,7 @@ read_buffer = { path = "../read_buffer" }
service_common = { path = "../service_common" }
service_grpc_schema = { path = "../service_grpc_schema" }
schema = { path = "../schema" }
sharder = { path = "../sharder" }
snafu = "0.7"
thiserror = "1.0"
iox_time = { path = "../iox_time" }

View File

@ -6,10 +6,11 @@ use crate::{
};
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::Namespace;
use data_types::{KafkaPartition, Namespace};
use iox_query::exec::Executor;
use parquet_file::storage::ParquetStorage;
use service_common::QueryDatabaseProvider;
use sharder::JumpHash;
use std::sync::Arc;
use tracker::{
AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore,
@ -53,6 +54,10 @@ pub struct QuerierDatabase {
///
/// If the same database is requested twice for different queries, it is counted twice.
query_execution_semaphore: Arc<InstrumentedAsyncSemaphore>,
/// Optional sharder to determine which ingesters to query for a particular table and
/// namespace. If not specified, all ingesters will be queried.
_sharder: Option<JumpHash<Arc<KafkaPartition>>>,
}
#[async_trait]
@ -74,8 +79,9 @@ impl QueryDatabaseProvider for QuerierDatabase {
impl QuerierDatabase {
/// The maximum value for `max_concurrent_queries` that is allowed.
///
/// This limit exists because [`tokio::sync::Semaphore`] has an internal limit and semaphore creation beyond that
/// will panic. The tokio limit is not exposed though so we pick a reasonable but smaller number.
/// This limit exists because [`tokio::sync::Semaphore`] has an internal limit and semaphore
/// creation beyond that will panic. The tokio limit is not exposed though so we pick a
/// reasonable but smaller number.
pub const MAX_CONCURRENT_QUERIES_MAX: usize = u16::MAX as usize;
/// Create new database.
@ -86,6 +92,7 @@ impl QuerierDatabase {
exec: Arc<Executor>,
ingester_connection: Arc<dyn IngesterConnection>,
max_concurrent_queries: usize,
sharder: Option<JumpHash<Arc<KafkaPartition>>>,
) -> Self {
assert!(
max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX,
@ -117,13 +124,14 @@ impl QuerierDatabase {
ingester_connection,
query_log,
query_execution_semaphore,
_sharder: sharder,
}
}
/// Get namespace if it exists.
///
/// This will await the internal namespace semaphore. Existence of namespaces is checked AFTER a semaphore permit
/// was acquired since this lowers the chance that we obtain stale data.
/// This will await the internal namespace semaphore. Existence of namespaces is checked AFTER
/// a semaphore permit was acquired since this lowers the chance that we obtain stale data.
pub async fn namespace(&self, name: &str) -> Option<Arc<QuerierNamespace>> {
let name = Arc::from(name.to_owned());
let schema = self
@ -191,6 +199,7 @@ mod tests {
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX.saturating_add(1),
None,
);
}
@ -211,6 +220,7 @@ mod tests {
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
None,
);
catalog.create_namespace("ns1").await;
@ -236,6 +246,7 @@ mod tests {
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
None,
);
catalog.create_namespace("ns1").await;

View File

@ -181,6 +181,7 @@ mod tests {
exec,
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
None,
));
let querier = QuerierHandlerImpl::new(catalog, database);