refactor: Move sharding query from namespace to table

Better supports future work. Fixes #5003.
pull/24376/head
Carol (Nichols || Goulding) 2022-06-30 10:35:27 -04:00
parent be53716e4d
commit 380166f4c0
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
3 changed files with 25 additions and 16 deletions

View File

@ -19,10 +19,13 @@ mod test_util;
/// Maps a catalog namespace to all the in-memory resources and sync-state that the querier needs.
///
/// # Data Structures & Sync
/// Tables and schemas are created when [`QuerierNamespace`] is created because DataFusion does not implement async
/// schema inspection. The actual payload (chunks and tombstones) are only queried on demand.
///
/// Most access to the [IOx Catalog](iox_catalog::interface::Catalog) are cached via [`CatalogCache`].
/// Tables and schemas are created when [`QuerierNamespace`] is created because DataFusion does not
/// implement async schema inspection. The actual payload (chunks and tombstones) are only queried
/// on demand.
///
/// Most accesses to the [IOx Catalog](iox_catalog::interface::Catalog) are cached via
/// [`CatalogCache`].
#[derive(Debug)]
pub struct QuerierNamespace {
/// ID of this namespace.
@ -37,7 +40,7 @@ pub struct QuerierNamespace {
/// Executor for queries.
exec: Arc<Executor>,
/// Catalog cache
/// Catalog cache.
catalog_cache: Arc<CatalogCache>,
/// Query log.
@ -63,12 +66,8 @@ impl QuerierNamespace {
let id = table_schema.id;
let schema = Schema::try_from(table_schema.clone()).expect("cannot build schema");
// Currently, the sharder will only return one sequencer ID per table, but in the
// near future, the sharder might return more than one sequencer ID for one table.
let sequencer_ids = vec![**sharder.shard_for_query(&table_name, &name)];
let table = Arc::new(QuerierTable::new(
sequencer_ids,
Arc::clone(&sharder),
Arc::clone(&name),
id,
Arc::clone(&table_name),

View File

@ -11,6 +11,7 @@ use iox_query::{provider::ChunkPruner, QueryChunk};
use observability_deps::tracing::debug;
use predicate::Predicate;
use schema::Schema;
use sharder::JumpHash;
use snafu::{ResultExt, Snafu};
use std::{
collections::{hash_map::Entry, HashMap},
@ -52,8 +53,8 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Table representation for the querier.
#[derive(Debug)]
pub struct QuerierTable {
/// Sequencers responsible for the table's data that need to be queried for unpersisted data
sequencer_ids: Vec<KafkaPartition>,
/// Sharder to query for which sequencers are responsible for the table's data
sharder: Arc<JumpHash<Arc<KafkaPartition>>>,
/// Namespace the table is in
namespace_name: Arc<str>,
@ -80,7 +81,7 @@ pub struct QuerierTable {
impl QuerierTable {
/// Create new table.
pub fn new(
sequencer_ids: Vec<KafkaPartition>,
sharder: Arc<JumpHash<Arc<KafkaPartition>>>,
namespace_name: Arc<str>,
id: TableId,
table_name: Arc<str>,
@ -95,7 +96,7 @@ impl QuerierTable {
);
Self {
sequencer_ids,
sharder,
namespace_name,
table_name,
id,
@ -189,10 +190,18 @@ impl QuerierTable {
.map(|(_, f)| f.name().to_string())
.collect();
// get any chunks from the ingester
// Get the sequencer IDs responsible for this table's data from the sharder to
// determine which ingester(s) to query.
// Currently, the sharder will only return one sequencer ID per table, but in the
// near future, the sharder might return more than one sequencer ID for one table.
let sequencer_ids = vec![**self
.sharder
.shard_for_query(&self.table_name, &self.namespace_name)];
// get any chunks from the ingester(s)
let partitions_result = ingester_connection
.partitions(
&self.sequencer_ids,
&sequencer_ids,
Arc::clone(&self.namespace_name),
Arc::clone(&self.table_name),
columns,

View File

@ -10,6 +10,7 @@ use iox_tests::util::{TestCatalog, TestPartition, TestSequencer, TestTable};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use parquet_file::storage::ParquetStorage;
use schema::{selection::Selection, sort::SortKey, Schema};
use sharder::JumpHash;
use std::sync::Arc;
/// Create a [`QuerierTable`] for testing.
@ -37,7 +38,7 @@ pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -
let namespace_name = Arc::from(table.namespace.namespace.name.as_str());
QuerierTable::new(
vec![KafkaPartition::new(0)],
Arc::new(JumpHash::new((0..1).map(KafkaPartition::new).map(Arc::new)).unwrap()),
namespace_name,
table.table.id,
table.table.name.clone().into(),