fix: Create querier sharder from catalog sequencer info

Panic if there are no sharders in the catalog.
pull/24376/head
Carol (Nichols || Goulding) 2022-06-13 12:15:10 -04:00
parent 553590fb23
commit e9cdaffe74
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
11 changed files with 188 additions and 205 deletions

1
Cargo.lock generated
View File

@ -2422,6 +2422,7 @@ dependencies = [
"observability_deps",
"parquet_file",
"schema",
"sharder",
"uuid 0.8.2",
"workspace-hack",
]

View File

@ -63,57 +63,6 @@ pub struct WriteBufferConfig {
pub(crate) auto_create_topics: Option<NonZeroU32>,
}
/// For use by the querier. If these options are specified, the querier can use the same sharding
/// that the router uses to know the subset of ingesters to query. If these options are not
/// specified, the querier will ask all ingesters.
#[derive(Debug, clap::Parser)]
pub struct OptionalWriteBufferConfig {
/// The type of write buffer to use.
///
/// Valid options are: file, kafka
#[clap(long = "--write-buffer", env = "INFLUXDB_IOX_WRITE_BUFFER_TYPE")]
pub(crate) type_: Option<String>,
/// The address to the write buffer.
#[clap(long = "--write-buffer-addr", env = "INFLUXDB_IOX_WRITE_BUFFER_ADDR")]
pub(crate) connection_string: Option<String>,
/// Write buffer topic/database that should be used.
#[clap(
long = "--write-buffer-topic",
env = "INFLUXDB_IOX_WRITE_BUFFER_TOPIC",
default_value = "iox-shared"
)]
pub(crate) topic: String,
/// Write buffer connection config.
///
/// The concrete options depend on the write buffer type.
///
/// Command line arguments are passed as
/// `--write-buffer-connection-config key1=value1 key2=value2` or
/// `--write-buffer-connection-config key1=value1,key2=value2`.
///
/// Environment variables are passed as `key1=value1,key2=value2,...`.
#[clap(
long = "--write-buffer-connection-config",
env = "INFLUXDB_IOX_WRITE_BUFFER_CONNECTION_CONFIG",
default_value = "",
multiple_values = true,
use_value_delimiter = true,
action
)]
pub(crate) connection_config: Vec<String>,
/// The number of topics to create automatically, if any. Default is to not create any topics.
#[clap(
long = "--write-buffer-auto-create-topics",
env = "INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS",
action
)]
pub(crate) auto_create_topics: Option<NonZeroU32>,
}
impl WriteBufferConfig {
/// Create a new instance for all-in-one mode, only allowing some arguments.
/// If `database_directory` is not specified, creates a new temporary directory.
@ -218,47 +167,6 @@ impl WriteBufferConfig {
}
}
impl From<WriteBufferConfig> for OptionalWriteBufferConfig {
fn from(write_buffer_config: WriteBufferConfig) -> Self {
Self {
type_: Some(write_buffer_config.type_.clone()),
connection_string: Some(write_buffer_config.connection_string.clone()),
topic: write_buffer_config.topic.clone(),
connection_config: write_buffer_config.connection_config.clone(),
auto_create_topics: write_buffer_config.auto_create_topics,
}
}
}
impl OptionalWriteBufferConfig {
/// Initialize a [`WriteBufferWriting`] if the options are specified
pub async fn writing(
&self,
metrics: Arc<metric::Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Option<Arc<dyn WriteBufferWriting>>, WriteBufferError> {
match (self.type_.as_ref(), self.connection_string.as_ref()) {
(Some(type_), Some(connection_string)) => {
let write_buffer_config = WriteBufferConfig {
type_: type_.to_string(),
connection_string: connection_string.to_string(),
topic: self.topic.clone(),
connection_config: self.connection_config.clone(),
auto_create_topics: self.auto_create_topics,
};
let conn = write_buffer_config.conn();
let factory = WriteBufferConfig::factory(metrics);
Ok(Some(
factory
.new_config_write(&self.topic, trace_collector.as_ref(), &conn)
.await?,
))
}
_ => Ok(None),
}
}
}
#[cfg(test)]
mod tests {
use clap::StructOpt;

View File

@ -516,14 +516,12 @@ pub async fn command(config: Config) -> Result<()> {
let ingester_addresses = vec![format!("http://{}", ingester_run_config.grpc_bind_address)];
info!(?ingester_addresses, "starting querier");
let optional_write_buffer_config = write_buffer_config.into();
let querier = create_querier_server_type(QuerierServerTypeArgs {
common_state: &common_state,
metric_registry: Arc::clone(&metrics),
catalog,
object_store,
exec,
optional_write_buffer_config: &optional_write_buffer_config,
time_provider,
ingester_addresses,
querier_config,

View File

@ -3,7 +3,7 @@
use super::main;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, object_store::make_object_store, querier::QuerierConfig,
run_config::RunConfig, write_buffer::OptionalWriteBufferConfig,
run_config::RunConfig,
};
use iox_query::exec::Executor;
use iox_time::{SystemProvider, TimeProvider};
@ -66,9 +66,6 @@ pub struct Config {
#[clap(flatten)]
pub(crate) querier_config: QuerierConfig,
#[clap(flatten)]
pub(crate) optional_write_buffer_config: OptionalWriteBufferConfig,
}
pub async fn command(config: Config) -> Result<(), Error> {
@ -108,7 +105,6 @@ pub async fn command(config: Config) -> Result<(), Error> {
catalog,
object_store,
exec,
optional_write_buffer_config: &config.optional_write_buffer_config,
time_provider,
ingester_addresses,
querier_config: config.querier_config,

View File

@ -19,6 +19,7 @@ observability_deps = { path = "../observability_deps" }
parquet_file = { path = "../parquet_file" }
iox_query = { path = "../iox_query" }
schema = { path = "../schema" }
sharder = { path = "../sharder" }
uuid = { version = "0.8", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}
futures = "0.3.21"

View File

@ -91,7 +91,26 @@ impl TestCatalog {
Arc::clone(&self.exec)
}
/// Create a namesapce in teh catalog
/// Create a sequencer in the catalog
pub async fn create_sequencer(self: &Arc<Self>, sequencer: i32) -> Arc<Sequencer> {
let mut repos = self.catalog.repositories().await;
let kafka_topic = repos
.kafka_topics()
.create_or_get("kafka_topic")
.await
.unwrap();
let kafka_partition = KafkaPartition::new(sequencer);
Arc::new(
repos
.sequencers()
.create_or_get(&kafka_topic, kafka_partition)
.await
.unwrap(),
)
}
/// Create a namesapce in the catalog
pub async fn create_namespace(self: &Arc<Self>, name: &str) -> Arc<TestNamespace> {
let mut repos = self.catalog.repositories().await;

View File

@ -1,6 +1,5 @@
use async_trait::async_trait;
use clap_blocks::{querier::QuerierConfig, write_buffer::OptionalWriteBufferConfig};
use data_types::KafkaPartition;
use clap_blocks::querier::QuerierConfig;
use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
@ -20,9 +19,7 @@ use querier::{
create_ingester_connection, QuerierCatalogCache, QuerierDatabase, QuerierHandler,
QuerierHandlerImpl, QuerierServer,
};
use sharder::JumpHash;
use std::{
collections::BTreeSet,
fmt::{Debug, Display},
sync::Arc,
};
@ -147,7 +144,6 @@ pub struct QuerierServerTypeArgs<'a> {
pub catalog: Arc<dyn Catalog>,
pub object_store: Arc<DynObjectStore>,
pub exec: Arc<Executor>,
pub optional_write_buffer_config: &'a OptionalWriteBufferConfig,
pub time_provider: Arc<dyn TimeProvider>,
pub ingester_addresses: Vec<String>,
pub querier_config: QuerierConfig,
@ -160,6 +156,9 @@ pub enum Error {
#[error("failed to create KafkaPartition from id: {0}")]
InvalidData(#[from] std::num::TryFromIntError),
#[error("querier error: {0}")]
Querier(#[from] querier::QuerierDatabaseError),
}
/// Instantiate a querier server
@ -176,22 +175,17 @@ pub async fn create_querier_server_type(
let ingester_connection =
create_ingester_connection(args.ingester_addresses, Arc::clone(&catalog_cache));
let sharder = maybe_sharder(
args.optional_write_buffer_config,
Arc::clone(&args.metric_registry),
args.common_state.trace_collector(),
)
.await?;
let database = Arc::new(QuerierDatabase::new(
catalog_cache,
Arc::clone(&args.metric_registry),
ParquetStorage::new(args.object_store),
args.exec,
ingester_connection,
args.querier_config.max_concurrent_queries(),
sharder,
));
let database = Arc::new(
QuerierDatabase::new(
catalog_cache,
Arc::clone(&args.metric_registry),
ParquetStorage::new(args.object_store),
args.exec,
ingester_connection,
args.querier_config.max_concurrent_queries(),
)
.await?,
);
let querier_handler = Arc::new(QuerierHandlerImpl::new(args.catalog, Arc::clone(&database)));
let querier = QuerierServer::new(args.metric_registry, querier_handler);
@ -201,31 +195,3 @@ pub async fn create_querier_server_type(
args.common_state,
)))
}
pub async fn maybe_sharder(
optional_write_buffer_config: &OptionalWriteBufferConfig,
metric_registry: Arc<metric::Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Option<JumpHash<Arc<KafkaPartition>>>, Error> {
optional_write_buffer_config
.writing(metric_registry, trace_collector)
.await?
.map(|write_buffer| {
// Construct the (ordered) set of sequencers.
//
// The sort order must be deterministic in order for all nodes to shard to
// the same sequencers, therefore we type assert the returned set is of the
// ordered variety.
let shards: BTreeSet<_> = write_buffer.sequencer_ids();
// ^ don't change this to an unordered set
Ok(shards
.into_iter()
.map(|id| Ok(KafkaPartition::new(id.try_into()?)))
.collect::<Result<Vec<KafkaPartition>, Error>>()?
.into_iter()
.map(Arc::new)
.collect::<JumpHash<_>>())
})
.transpose()
}

View File

@ -53,32 +53,37 @@ impl proto::namespace_service_server::NamespaceService for NamespaceServiceImpl
#[cfg(test)]
mod tests {
use super::*;
use generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService;
use iox_tests::util::TestCatalog;
use parquet_file::storage::ParquetStorage;
use querier::{create_ingester_connection_for_testing, QuerierCatalogCache};
use super::*;
#[tokio::test]
async fn test_get_namespaces_empty() {
let catalog = TestCatalog::new();
// QuerierDatabase::new panics if there are no sequencers in the catalog
catalog.create_sequencer(0).await;
let catalog_cache = Arc::new(QuerierCatalogCache::new(
catalog.catalog(),
catalog.time_provider(),
catalog.metric_registry(),
usize::MAX,
));
let db = Arc::new(QuerierDatabase::new(
catalog_cache,
catalog.metric_registry(),
ParquetStorage::new(catalog.object_store()),
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
None,
));
let db = Arc::new(
QuerierDatabase::new(
catalog_cache,
catalog.metric_registry(),
ParquetStorage::new(catalog.object_store()),
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
)
.await
.unwrap(),
);
let service = NamespaceServiceImpl::new(db);
@ -93,21 +98,27 @@ mod tests {
async fn test_get_namespaces() {
let catalog = TestCatalog::new();
// QuerierDatabase::new panics if there are no sequencers in the catalog
catalog.create_sequencer(0).await;
let catalog_cache = Arc::new(QuerierCatalogCache::new(
catalog.catalog(),
catalog.time_provider(),
catalog.metric_registry(),
usize::MAX,
));
let db = Arc::new(QuerierDatabase::new(
catalog_cache,
catalog.metric_registry(),
ParquetStorage::new(catalog.object_store()),
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
None,
));
let db = Arc::new(
QuerierDatabase::new(
catalog_cache,
catalog.metric_registry(),
ParquetStorage::new(catalog.object_store()),
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
)
.await
.unwrap(),
);
let service = NamespaceServiceImpl::new(db);
catalog.create_namespace("namespace2").await;

View File

@ -7,11 +7,13 @@ use crate::{
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{KafkaPartition, Namespace};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use parquet_file::storage::ParquetStorage;
use service_common::QueryDatabaseProvider;
use sharder::JumpHash;
use std::sync::Arc;
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeSet, sync::Arc};
use tracker::{
AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore,
};
@ -21,6 +23,15 @@ use tracker::{
/// That buffer is shared between all namespaces, and filtered on query
const QUERY_LOG_SIZE: usize = 10_000;
#[allow(missing_docs)]
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Catalog error: {source}"))]
Catalog {
source: iox_catalog::interface::Error,
},
}
/// Database for the querier.
///
/// Contains all namespaces.
@ -55,9 +66,8 @@ pub struct QuerierDatabase {
/// If the same database is requested twice for different queries, it is counted twice.
query_execution_semaphore: Arc<InstrumentedAsyncSemaphore>,
/// Optional sharder to determine which ingesters to query for a particular table and
/// namespace. If not specified, all ingesters will be queried.
_sharder: Option<JumpHash<Arc<KafkaPartition>>>,
/// Sharder to determine which ingesters to query for a particular table and namespace.
_sharder: JumpHash<Arc<KafkaPartition>>,
}
#[async_trait]
@ -85,15 +95,14 @@ impl QuerierDatabase {
pub const MAX_CONCURRENT_QUERIES_MAX: usize = u16::MAX as usize;
/// Create new database.
pub fn new(
pub async fn new(
catalog_cache: Arc<CatalogCache>,
metric_registry: Arc<metric::Registry>,
store: ParquetStorage,
exec: Arc<Executor>,
ingester_connection: Arc<dyn IngesterConnection>,
max_concurrent_queries: usize,
sharder: Option<JumpHash<Arc<KafkaPartition>>>,
) -> Self {
) -> Result<Self, Error> {
assert!(
max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX,
"`max_concurrent_queries` ({}) > `max_concurrent_queries_MAX` ({})",
@ -115,7 +124,9 @@ impl QuerierDatabase {
let query_execution_semaphore =
Arc::new(semaphore_metrics.new_semaphore(max_concurrent_queries));
Self {
let _sharder = create_sharder(catalog_cache.catalog().as_ref()).await?;
Ok(Self {
backoff_config: BackoffConfig::default(),
catalog_cache,
chunk_adapter,
@ -124,8 +135,8 @@ impl QuerierDatabase {
ingester_connection,
query_log,
query_execution_semaphore,
_sharder: sharder,
}
_sharder,
})
}
/// Get namespace if it exists.
@ -171,6 +182,29 @@ impl QuerierDatabase {
}
}
pub async fn create_sharder(catalog: &dyn Catalog) -> Result<JumpHash<Arc<KafkaPartition>>, Error> {
let sequencers = catalog
.repositories()
.await
.sequencers()
.list()
.await
.context(CatalogSnafu)?;
// Construct the (ordered) set of sequencers.
//
// The sort order must be deterministic in order for all nodes to shard to
// the same sequencers, therefore we type assert the returned set is of the
// ordered variety.
let shards: BTreeSet<_> = sequencers
// ^ don't change this to an unordered set
.into_iter()
.map(|sequencer| sequencer.kafka_partition)
.collect();
Ok(shards.into_iter().map(Arc::new).collect())
}
#[cfg(test)]
mod tests {
use iox_tests::util::TestCatalog;
@ -179,11 +213,11 @@ mod tests {
use super::*;
#[test]
#[tokio::test]
#[should_panic(
expected = "`max_concurrent_queries` (65536) > `max_concurrent_queries_MAX` (65535)"
)]
fn test_semaphore_limit_is_checked() {
async fn test_semaphore_limit_is_checked() {
let catalog = TestCatalog::new();
let catalog_cache = Arc::new(CatalogCache::new(
@ -199,13 +233,40 @@ mod tests {
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX.saturating_add(1),
None,
);
)
.await
.unwrap();
}
#[tokio::test]
#[should_panic(expected = "cannot initialise sharder with no shards")]
async fn sequencers_in_catalog_are_required_for_startup() {
let catalog = TestCatalog::new();
let catalog_cache = Arc::new(CatalogCache::new(
catalog.catalog(),
catalog.time_provider(),
catalog.metric_registry(),
usize::MAX,
));
QuerierDatabase::new(
catalog_cache,
catalog.metric_registry(),
ParquetStorage::new(catalog.object_store()),
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
)
.await
.unwrap();
}
#[tokio::test]
async fn test_namespace() {
let catalog = TestCatalog::new();
// QuerierDatabase::new panics if there are no sequencers in the catalog
catalog.create_sequencer(0).await;
let catalog_cache = Arc::new(CatalogCache::new(
catalog.catalog(),
@ -220,8 +281,9 @@ mod tests {
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
None,
);
)
.await
.unwrap();
catalog.create_namespace("ns1").await;
@ -232,6 +294,8 @@ mod tests {
#[tokio::test]
async fn test_namespaces() {
let catalog = TestCatalog::new();
// QuerierDatabase::new panics if there are no sequencers in the catalog
catalog.create_sequencer(0).await;
let catalog_cache = Arc::new(CatalogCache::new(
catalog.catalog(),
@ -246,8 +310,9 @@ mod tests {
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
None,
);
)
.await
.unwrap();
catalog.create_namespace("ns1").await;
catalog.create_namespace("ns2").await;

View File

@ -128,21 +128,19 @@ impl Drop for QuerierHandlerImpl {
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::{cache::CatalogCache, create_ingester_connection_for_testing};
use data_types::KafkaPartition;
use iox_catalog::mem::MemCatalog;
use iox_query::exec::Executor;
use iox_time::{MockProvider, Time};
use object_store::memory::InMemory;
use parquet_file::storage::ParquetStorage;
use crate::{cache::CatalogCache, create_ingester_connection_for_testing};
use super::*;
use std::time::Duration;
#[tokio::test]
async fn test_shutdown() {
let querier = TestQuerier::new().querier;
let querier = TestQuerier::new().await.querier;
// does not exit w/o shutdown
tokio::select! {
@ -162,7 +160,7 @@ mod tests {
}
impl TestQuerier {
fn new() -> Self {
async fn new() -> Self {
let metric_registry = Arc::new(metric::Registry::new());
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metric_registry))) as _;
let object_store = Arc::new(InMemory::new());
@ -174,15 +172,35 @@ mod tests {
Arc::clone(&metric_registry),
usize::MAX,
));
let database = Arc::new(QuerierDatabase::new(
catalog_cache,
metric_registry,
ParquetStorage::new(object_store),
exec,
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
None,
));
// QuerierDatabase::new panics if there are no sequencers in the catalog
{
let mut repos = catalog.repositories().await;
let kafka_topic = repos
.kafka_topics()
.create_or_get("kafka_topic")
.await
.unwrap();
let kafka_partition = KafkaPartition::new(0);
repos
.sequencers()
.create_or_get(&kafka_topic, kafka_partition)
.await
.unwrap();
}
let database = Arc::new(
QuerierDatabase::new(
catalog_cache,
metric_registry,
ParquetStorage::new(object_store),
exec,
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
)
.await
.unwrap(),
);
let querier = QuerierHandlerImpl::new(catalog, database);
Self { querier }

View File

@ -23,7 +23,7 @@ mod table;
mod tombstone;
pub use cache::CatalogCache as QuerierCatalogCache;
pub use database::QuerierDatabase;
pub use database::{Error as QuerierDatabaseError, QuerierDatabase};
pub use handler::{QuerierHandler, QuerierHandlerImpl};
pub use ingester::{
create_ingester_connection, create_ingester_connection_for_testing,