diff --git a/iox_query/src/frontend/influxrpc.rs b/iox_query/src/frontend/influxrpc.rs index d72c0489bb..2f298ea221 100644 --- a/iox_query/src/frontend/influxrpc.rs +++ b/iox_query/src/frontend/influxrpc.rs @@ -31,11 +31,7 @@ use query_functions::{ }; use schema::{selection::Selection, InfluxColumnType, Schema, TIME_COLUMN_NAME}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; -use std::{ - cmp::Reverse, - collections::{BTreeMap, BTreeSet}, - sync::Arc, -}; +use std::{cmp::Reverse, collections::BTreeSet, sync::Arc}; const CONCURRENT_TABLE_JOBS: usize = 10; @@ -458,7 +454,7 @@ impl InfluxRpcPlanner { /// the conditions specified by `predicate`. pub async fn tag_values( &self, - database: &dyn QueryDatabase, + database: Arc, tag_name: &str, rpc_predicate: InfluxRpcPredicate, ) -> Result { @@ -473,107 +469,122 @@ impl InfluxRpcPlanner { // distinct values that can be found from only metadata and // which need full plans - // Key is table name, value is set of chunks which had data - // for that table but that we couldn't evaluate the predicate - // entirely using the metadata - let mut need_full_plans = BTreeMap::new(); - let mut known_values = BTreeSet::new(); - let table_predicates = rpc_predicate .table_predicates(database.as_meta()) .context(CreatingPredicatesSnafu)?; - for (table_name, predicate) in &table_predicates { - let chunks = database - .chunks(table_name, predicate, ctx.child_ctx("table chunks")) - .await - .context(GettingChunksSnafu { table_name })?; - for chunk in cheap_chunk_first(chunks) { - // If there are delete predicates, we need to scan (or do full plan) the data to eliminate - // deleted data before getting tag values - let mut do_full_plan = chunk.has_delete_predicates(); - // Try and apply the predicate using only metadata - let pred_result = chunk.apply_predicate_to_metadata(predicate).context( - CheckingChunkPredicateSnafu { - chunk_id: chunk.id(), - }, - )?; + // filter out tables that do NOT contain `tag_name` early, esp. before performing any chunk scan (which includes + // ingester RPC) + let mut table_predicates_filtered = Vec::with_capacity(table_predicates.len()); + for (table_name, predicate) in table_predicates { + let schema = database + .table_schema(&table_name) + .context(TableRemovedSnafu { + table_name: &table_name, + })?; - if matches!(pred_result, PredicateMatch::Zero) { - continue; - } + // Skip this table if the tag_name is not a column in this table + if schema.find_index_of(tag_name).is_none() { + continue; + }; - // use schema to validate column type - let schema = chunk.schema(); + table_predicates_filtered.push((table_name, predicate)); + } - // Skip this table if the tag_name is not a column in this table - let idx = if let Some(idx) = schema.find_index_of(tag_name) { - idx - } else { - continue; - }; + let tables: Vec<_> = + table_chunk_stream(Arc::clone(&database), &table_predicates_filtered, &ctx) + .and_then(|(table_name, predicate, chunks)| async move { + let mut chunks_full = vec![]; + let mut known_values = BTreeSet::new(); - // Validate that this really is a Tag column - let (influx_column_type, field) = schema.field(idx); - ensure!( - matches!(influx_column_type, Some(InfluxColumnType::Tag)), - InvalidTagColumnSnafu { - tag_name, - influx_column_type, - } - ); - ensure!( - influx_column_type - .unwrap() - .valid_arrow_type(field.data_type()), - InternalInvalidTagTypeSnafu { - tag_name, - data_type: field.data_type().clone(), - } - ); + for chunk in cheap_chunk_first(chunks) { + // Try and apply the predicate using only metadata + let pred_result = chunk.apply_predicate_to_metadata(predicate).context( + CheckingChunkPredicateSnafu { + chunk_id: chunk.id(), + }, + )?; - if !do_full_plan { - // try and get the list of values directly from metadata - let maybe_values = chunk - .column_values( - self.ctx.child_ctx("tag_values execution"), - tag_name, - predicate, - ) - .context(FindingColumnValuesSnafu)?; + if matches!(pred_result, PredicateMatch::Zero) { + continue; + } - match maybe_values { - Some(mut names) => { + // use schema to validate column type + let schema = chunk.schema(); + + // Skip this table if the tag_name is not a column in this chunk + // Note: This may happen even when the table contains the tag_name, because some chunks may not + // contain all columns. + let idx = if let Some(idx) = schema.find_index_of(tag_name) { + idx + } else { + continue; + }; + + // Validate that this really is a Tag column + let (influx_column_type, field) = schema.field(idx); + ensure!( + matches!(influx_column_type, Some(InfluxColumnType::Tag)), + InvalidTagColumnSnafu { + tag_name, + influx_column_type, + } + ); + ensure!( + influx_column_type + .unwrap() + .valid_arrow_type(field.data_type()), + InternalInvalidTagTypeSnafu { + tag_name, + data_type: field.data_type().clone(), + } + ); + + // If there are delete predicates, we need to scan (or do full plan) the data to eliminate + // deleted data before getting tag values + if chunk.has_delete_predicates() { debug!( %table_name, - names=?names, chunk_id=%chunk.id().get(), - "tag values found from metadata", + "need full plan to find tag values" ); - known_values.append(&mut names); - } - None => { - do_full_plan = true; + + chunks_full.push(chunk); + } else { + // try and get the list of values directly from metadata + let mut ctx = self.ctx.child_ctx("tag_values execution"); + ctx.set_metadata("table", table_name.to_owned()); + + let maybe_values = chunk + .column_values(ctx, tag_name, predicate) + .context(FindingColumnValuesSnafu)?; + + match maybe_values { + Some(mut names) => { + debug!( + %table_name, + names=?names, + chunk_id=%chunk.id().get(), + "tag values found from metadata", + ); + known_values.append(&mut names); + } + None => { + debug!( + %table_name, + chunk_id=%chunk.id().get(), + "need full plan to find tag values" + ); + chunks_full.push(chunk); + } + } } } - } - // can't get columns only from metadata, need - // a general purpose plan - if do_full_plan { - debug!( - %table_name, - chunk_id=%chunk.id().get(), - "need full plan to find tag values" - ); - - need_full_plans - .entry(table_name) - .or_insert_with(Vec::new) - .push(Arc::clone(&chunk)); - } - } - } + Ok((table_name, predicate, chunks_full, known_values)) + }) + .try_collect() + .await?; let mut builder = StringSetPlanBuilder::new(); @@ -582,17 +593,21 @@ impl InfluxRpcPlanner { // At this point, we have a set of tag_values we know at plan // time in `known_columns`, and some tables in chunks that we // need to run a plan to find what values pass the predicate. - for (table_name, predicate) in &table_predicates { - if let Some(chunks) = need_full_plans.remove(table_name) { + for (table_name, predicate, chunks_full, known_values) in tables { + builder = builder.append_other(known_values.into()); + + if !chunks_full.is_empty() { let schema = database .table_schema(table_name) .context(TableRemovedSnafu { table_name })?; - let scan_and_filter = - ScanPlanBuilder::new(schema, ctx.child_ctx("scan_and_filter planning")) - .with_chunks(chunks) - .with_predicate(predicate) - .build()?; + let mut ctx = ctx.child_ctx("scan_and_filter planning"); + ctx.set_metadata("table", table_name.to_owned()); + + let scan_and_filter = ScanPlanBuilder::new(schema, ctx) + .with_chunks(chunks_full) + .with_predicate(predicate) + .build()?; let tag_name_is_not_null = Expr::Column(tag_name.into()).is_not_null(); @@ -619,11 +634,7 @@ impl InfluxRpcPlanner { } } - // add the known values we could find from metadata only - builder - .append_other(known_values.into()) - .build() - .context(CreatingStringSetSnafu) + builder.build().context(CreatingStringSetSnafu) } /// Returns a plan that produces a list of columns and their @@ -1792,7 +1803,7 @@ mod tests { run_test(|test_db, rpc_predicate| { async move { InfluxRpcPlanner::new(IOxSessionContext::with_testing()) - .tag_values(test_db.as_ref(), "foo", rpc_predicate) + .tag_values(test_db, "foo", rpc_predicate) .await .expect("creating plan"); } diff --git a/query_tests/src/db.rs b/query_tests/src/db.rs index eab4fb45e5..c6d4362e0a 100644 --- a/query_tests/src/db.rs +++ b/query_tests/src/db.rs @@ -13,11 +13,6 @@ pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryDatabase /// This is required due to . fn as_catalog_provider_arc(self: Arc) -> Arc; - /// Upcast to [`QueryDatabase`]. - /// - /// This is required due to . - fn as_query_database(&self) -> &dyn QueryDatabase; - /// Upcast to [`QueryDatabase`]. /// /// This is required due to . @@ -33,10 +28,6 @@ impl AbstractDb for QuerierNamespace { self as _ } - fn as_query_database(&self) -> &dyn QueryDatabase { - self - } - fn as_query_database_arc(self: Arc) -> Arc { self } diff --git a/query_tests/src/influxrpc/tag_values.rs b/query_tests/src/influxrpc/tag_values.rs index 996eca2ef0..7a99ab59e7 100644 --- a/query_tests/src/influxrpc/tag_values.rs +++ b/query_tests/src/influxrpc/tag_values.rs @@ -30,7 +30,7 @@ async fn run_tag_values_test_case( let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner - .tag_values(db.as_query_database(), tag_name, predicate.clone()) + .tag_values(db.as_query_database_arc(), tag_name, predicate.clone()) .await .expect("built plan successfully"); let names = ctx @@ -318,7 +318,7 @@ async fn list_tag_values_field_col_on_tag() { let tag_name = "temp"; let plan_result = planner .tag_values( - db.as_query_database(), + db.as_query_database_arc(), tag_name, InfluxRpcPredicate::default(), ) diff --git a/service_common/src/planner.rs b/service_common/src/planner.rs index 07ba12518b..6431963aad 100644 --- a/service_common/src/planner.rs +++ b/service_common/src/planner.rs @@ -104,7 +104,7 @@ impl Planner { self.ctx .run(async move { planner - .tag_values(database.as_ref(), &tag_name, predicate) + .tag_values(database, &tag_name, predicate) .await .map_err(|e| Error::Plan(format!("tag_values error: {}", e))) }) diff --git a/service_grpc_influxrpc/src/service.rs b/service_grpc_influxrpc/src/service.rs index 16edeedad5..d83046390c 100644 --- a/service_grpc_influxrpc/src/service.rs +++ b/service_grpc_influxrpc/src/service.rs @@ -2120,7 +2120,9 @@ mod tests { let db_info = org_and_bucket(); - let chunk = TestChunk::new("my_table").with_error("Sugar we are going down"); + let chunk = TestChunk::new("my_table") + .with_tag_column("the_tag_key") + .with_error("Sugar we are going down"); fixture .test_storage @@ -2488,7 +2490,9 @@ mod tests { let db_info = org_and_bucket(); - let chunk = TestChunk::new("m5").with_error("Sugar we are going down"); + let chunk = TestChunk::new("m5") + .with_tag_column("the_tag_key") + .with_error("Sugar we are going down"); fixture .test_storage