diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index 9440f3114c..6bdf55bbf4 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -19,10 +19,13 @@ mod test_util; /// Maps a catalog namespace to all the in-memory resources and sync-state that the querier needs. /// /// # Data Structures & Sync -/// Tables and schemas are created when [`QuerierNamespace`] is created because DataFusion does not implement async -/// schema inspection. The actual payload (chunks and tombstones) are only queried on demand. /// -/// Most access to the [IOx Catalog](iox_catalog::interface::Catalog) are cached via [`CatalogCache`]. +/// Tables and schemas are created when [`QuerierNamespace`] is created because DataFusion does not +/// implement async schema inspection. The actual payload (chunks and tombstones) are only queried +/// on demand. +/// +/// Most accesses to the [IOx Catalog](iox_catalog::interface::Catalog) are cached via +/// [`CatalogCache`]. #[derive(Debug)] pub struct QuerierNamespace { /// ID of this namespace. @@ -37,7 +40,7 @@ pub struct QuerierNamespace { /// Executor for queries. exec: Arc, - /// Catalog cache + /// Catalog cache. catalog_cache: Arc, /// Query log. @@ -63,12 +66,8 @@ impl QuerierNamespace { let id = table_schema.id; let schema = Schema::try_from(table_schema.clone()).expect("cannot build schema"); - // Currently, the sharder will only return one sequencer ID per table, but in the - // near future, the sharder might return more than one sequencer ID for one table. - let sequencer_ids = vec![**sharder.shard_for_query(&table_name, &name)]; - let table = Arc::new(QuerierTable::new( - sequencer_ids, + Arc::clone(&sharder), Arc::clone(&name), id, Arc::clone(&table_name), diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 2adbabad47..070f879378 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -11,6 +11,7 @@ use iox_query::{provider::ChunkPruner, QueryChunk}; use observability_deps::tracing::debug; use predicate::Predicate; use schema::Schema; +use sharder::JumpHash; use snafu::{ResultExt, Snafu}; use std::{ collections::{hash_map::Entry, HashMap}, @@ -52,8 +53,8 @@ pub type Result = std::result::Result; /// Table representation for the querier. #[derive(Debug)] pub struct QuerierTable { - /// Sequencers responsible for the table's data that need to be queried for unpersisted data - sequencer_ids: Vec, + /// Sharder to query for which sequencers are responsible for the table's data + sharder: Arc>>, /// Namespace the table is in namespace_name: Arc, @@ -80,7 +81,7 @@ pub struct QuerierTable { impl QuerierTable { /// Create new table. pub fn new( - sequencer_ids: Vec, + sharder: Arc>>, namespace_name: Arc, id: TableId, table_name: Arc, @@ -95,7 +96,7 @@ impl QuerierTable { ); Self { - sequencer_ids, + sharder, namespace_name, table_name, id, @@ -189,10 +190,18 @@ impl QuerierTable { .map(|(_, f)| f.name().to_string()) .collect(); - // get any chunks from the ingester + // Get the sequencer IDs responsible for this table's data from the sharder to + // determine which ingester(s) to query. + // Currently, the sharder will only return one sequencer ID per table, but in the + // near future, the sharder might return more than one sequencer ID for one table. + let sequencer_ids = vec![**self + .sharder + .shard_for_query(&self.table_name, &self.namespace_name)]; + + // get any chunks from the ingester(s) let partitions_result = ingester_connection .partitions( - &self.sequencer_ids, + &sequencer_ids, Arc::clone(&self.namespace_name), Arc::clone(&self.table_name), columns, diff --git a/querier/src/table/test_util.rs b/querier/src/table/test_util.rs index 9db91bbb08..691b55a604 100644 --- a/querier/src/table/test_util.rs +++ b/querier/src/table/test_util.rs @@ -10,6 +10,7 @@ use iox_tests::util::{TestCatalog, TestPartition, TestSequencer, TestTable}; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use parquet_file::storage::ParquetStorage; use schema::{selection::Selection, sort::SortKey, Schema}; +use sharder::JumpHash; use std::sync::Arc; /// Create a [`QuerierTable`] for testing. @@ -37,7 +38,7 @@ pub async fn querier_table(catalog: &Arc, table: &Arc) - let namespace_name = Arc::from(table.namespace.namespace.name.as_str()); QuerierTable::new( - vec![KafkaPartition::new(0)], + Arc::new(JumpHash::new((0..1).map(KafkaPartition::new).map(Arc::new)).unwrap()), namespace_name, table.table.id, table.table.name.clone().into(),