From 274bd80ecd9d80fa50c841f55309c476ebcdd039 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 19 Sep 2022 15:27:18 +0200 Subject: [PATCH] refactor: concurrent table scan for "tag keys" (#5670) * refactor: concurrent table scan for "tag keys" Ref #5668. * feat: add table name to context metadata --- iox_query/src/frontend/influxrpc.rs | 241 +++++++++++++------------- query_tests/src/influxrpc/tag_keys.rs | 2 +- service_common/src/planner.rs | 2 +- 3 files changed, 122 insertions(+), 123 deletions(-) diff --git a/iox_query/src/frontend/influxrpc.rs b/iox_query/src/frontend/influxrpc.rs index af2695981e..d72c0489bb 100644 --- a/iox_query/src/frontend/influxrpc.rs +++ b/iox_query/src/frontend/influxrpc.rs @@ -286,13 +286,10 @@ impl InfluxRpcPlanner { .table_schema(table_name) .context(TableRemovedSnafu { table_name })?; - let plan = Self::table_name_plan( - ctx.child_ctx("table name plan"), - table_name, - schema, - predicate, - chunks, - )?; + let mut ctx = ctx.child_ctx("table name plan"); + ctx.set_metadata("table", table_name.to_owned()); + + let plan = Self::table_name_plan(ctx, table_name, schema, predicate, chunks)?; builder = builder.append_other(plan.into()); } } @@ -307,7 +304,7 @@ impl InfluxRpcPlanner { /// conditions specified by `predicate`. pub async fn tag_keys( &self, - database: &dyn QueryDatabase, + database: Arc, rpc_predicate: InfluxRpcPredicate, ) -> Result { let ctx = self.ctx.child_ctx("tag_keys planning"); @@ -323,139 +320,137 @@ impl InfluxRpcPlanner { // 2. For each table/chunk pair, figure out which can be found // from only metadata and which need full plans - // Key is table name, value is set of chunks which had data - // for that table but that we couldn't evaluate the predicate - // entirely using the metadata - let mut need_full_plans = BTreeMap::new(); - let mut known_columns = BTreeSet::new(); - let table_predicates = rpc_predicate .table_predicates(database.as_meta()) .context(CreatingPredicatesSnafu)?; - for (table_name, predicate) in &table_predicates { + + let mut table_predicates_need_chunks = vec![]; + let mut builder = StringSetPlanBuilder::new(); + for (table_name, predicate) in table_predicates { if predicate.is_empty() { // special case - return the columns from metadata only. // Note that columns with all rows deleted will still show here - known_columns.extend( + builder = builder.append_other( database - .table_schema(table_name) + .table_schema(&table_name) .context(TableRemovedSnafu { table_name })? .tags_iter() - .map(|f| f.name().clone()), + .map(|f| f.name().clone()) + .collect::>() + .into(), ); - continue; - } - - let chunks = database - .chunks(table_name, predicate, ctx.child_ctx("table chunks")) - .await - .context(GettingChunksSnafu { table_name })?; - for chunk in cheap_chunk_first(chunks) { - // If there are delete predicates, we need to scan (or do full plan) the data to eliminate - // deleted data before getting tag keys - let mut do_full_plan = chunk.has_delete_predicates(); - - // Try and apply the predicate using only metadata - let pred_result = chunk.apply_predicate_to_metadata(predicate).context( - CheckingChunkPredicateSnafu { - chunk_id: chunk.id(), - }, - )?; - - if matches!(pred_result, PredicateMatch::Zero) { - continue; - } - - // get only tag columns from metadata - let schema = chunk.schema(); - - let column_names: Vec<&str> = schema - .tags_iter() - .map(|f| f.name().as_str()) - .collect::>(); - - let selection = Selection::Some(&column_names); - - if !do_full_plan { - // filter the columns further from the predicate - let maybe_names = chunk - .column_names( - ctx.child_ctx("column_names execution"), - predicate, - selection, - ) - .context(FindingColumnNamesSnafu)?; - - match maybe_names { - Some(mut names) => { - debug!( - %table_name, - names=?names, - chunk_id=%chunk.id().get(), - "column names found from metadata", - ); - known_columns.append(&mut names); - } - None => { - do_full_plan = true; - } - } - } - - // can't get columns only from metadata, need - // a general purpose plan - if do_full_plan { - debug!( - %table_name, - chunk_id=%chunk.id().get(), - "column names need full plan" - ); - - need_full_plans - .entry(table_name) - .or_insert_with(Vec::new) - .push(Arc::clone(&chunk)); - } + } else { + table_predicates_need_chunks.push((table_name, predicate)); } } - let mut builder = StringSetPlanBuilder::new(); + let tables: Vec<_> = + table_chunk_stream(Arc::clone(&database), &table_predicates_need_chunks, &ctx) + .and_then(|(table_name, predicate, chunks)| { + let mut ctx = ctx.child_ctx("table"); + ctx.set_metadata("table", table_name.to_owned()); + + async move { + let mut chunks_full = vec![]; + let mut known_columns = BTreeSet::new(); + + for chunk in cheap_chunk_first(chunks) { + // Try and apply the predicate using only metadata + let pred_result = chunk + .apply_predicate_to_metadata(predicate) + .context(CheckingChunkPredicateSnafu { + chunk_id: chunk.id(), + })?; + + if matches!(pred_result, PredicateMatch::Zero) { + continue; + } + + // get only tag columns from metadata + let schema = chunk.schema(); + + let column_names: Vec<&str> = schema + .tags_iter() + .map(|f| f.name().as_str()) + .collect::>(); + + let selection = Selection::Some(&column_names); + + // If there are delete predicates, we need to scan (or do full plan) the data to eliminate + // deleted data before getting tag keys + if chunk.has_delete_predicates() { + debug!( + %table_name, + chunk_id=%chunk.id().get(), + "column names need full plan" + ); + chunks_full.push(chunk); + } else { + // filter the columns further from the predicate + let maybe_names = chunk + .column_names( + ctx.child_ctx("column_names execution"), + predicate, + selection, + ) + .context(FindingColumnNamesSnafu)?; + + match maybe_names { + Some(mut names) => { + debug!( + %table_name, + names=?names, + chunk_id=%chunk.id().get(), + "column names found from metadata", + ); + known_columns.append(&mut names); + } + None => { + debug!( + %table_name, + chunk_id=%chunk.id().get(), + "column names need full plan" + ); + chunks_full.push(chunk); + } + } + } + } + + Ok((table_name, predicate, chunks_full, known_columns)) + } + }) + .try_collect() + .await?; // At this point, we have a set of column names we know pass // in `known_columns`, and potentially some tables in chunks // that we need to run a plan to know if they pass the // predicate. - if !need_full_plans.is_empty() { - // TODO an additional optimization here would be to filter - // out chunks (and tables) where all columns in that chunk - // were already known to have data (based on the contents of known_columns) + for (table_name, predicate, chunks_full, known_columns) in tables { + builder = builder.append_other(known_columns.into()); - for (table_name, predicate) in &table_predicates { - if let Some(chunks) = need_full_plans.remove(table_name) { - let schema = database - .table_schema(table_name) - .context(TableRemovedSnafu { table_name })?; + if !chunks_full.is_empty() { + // TODO an additional optimization here would be to filter + // out chunks (and tables) where all columns in that chunk + // were already known to have data (based on the contents of known_columns) - let plan = self.tag_keys_plan( - ctx.child_ctx("tag_keys_plan"), - table_name, - schema, - predicate, - chunks, - )?; + let schema = database + .table_schema(table_name) + .context(TableRemovedSnafu { table_name })?; - if let Some(plan) = plan { - builder = builder.append_other(plan) - } + let mut ctx = ctx.child_ctx("tag_keys_plan"); + ctx.set_metadata("table", table_name.to_owned()); + + let plan = self.tag_keys_plan(ctx, table_name, schema, predicate, chunks_full)?; + + if let Some(plan) = plan { + builder = builder.append_other(plan) } } } - // add the known columns we could find from metadata only - builder - .append_other(known_columns.into()) - .build() - .context(CreatingStringSetSnafu) + builder.build().context(CreatingStringSetSnafu) } /// Returns a plan which finds the distinct, non-null tag values @@ -1312,12 +1307,14 @@ fn table_chunk_stream<'a>( ) -> impl Stream>)>> + 'a { futures::stream::iter(table_predicates) .map(move |(table_name, predicate)| { - let ctx = ctx.child_ctx("table chunks"); + let mut ctx = ctx.child_ctx("table"); + ctx.set_metadata("table", table_name.clone()); + let database = Arc::clone(&database); async move { let chunks = database - .chunks(table_name, predicate, ctx) + .chunks(table_name, predicate, ctx.child_ctx("table chunks")) .await .context(GettingChunksSnafu { table_name })?; @@ -1367,7 +1364,9 @@ where as Result)>> }) .and_then(|(table_name, predicate, chunks)| { - let ctx = &ctx; + let mut ctx = ctx.child_ctx("table"); + ctx.set_metadata("table", table_name.to_owned()); + let database = Arc::clone(&database); let f = f.clone(); @@ -1376,7 +1375,7 @@ where .table_schema(table_name) .context(TableRemovedSnafu { table_name })?; - f(ctx, table_name, predicate, chunks, schema) + f(&ctx, table_name, predicate, chunks, schema) } }) .try_collect() @@ -1779,7 +1778,7 @@ mod tests { run_test(|test_db, rpc_predicate| { async move { InfluxRpcPlanner::new(IOxSessionContext::with_testing()) - .tag_keys(test_db.as_ref(), rpc_predicate) + .tag_keys(test_db, rpc_predicate) .await .expect("creating plan"); } diff --git a/query_tests/src/influxrpc/tag_keys.rs b/query_tests/src/influxrpc/tag_keys.rs index f977200ce8..da21ca52d7 100644 --- a/query_tests/src/influxrpc/tag_keys.rs +++ b/query_tests/src/influxrpc/tag_keys.rs @@ -31,7 +31,7 @@ async fn run_tag_keys_test_case( let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner - .tag_keys(db.as_query_database(), predicate.clone()) + .tag_keys(db.as_query_database_arc(), predicate.clone()) .await .expect("built plan successfully"); let names = ctx diff --git a/service_common/src/planner.rs b/service_common/src/planner.rs index fc34586fee..07ba12518b 100644 --- a/service_common/src/planner.rs +++ b/service_common/src/planner.rs @@ -80,7 +80,7 @@ impl Planner { self.ctx .run(async move { planner - .tag_keys(database.as_ref(), predicate) + .tag_keys(database, predicate) .await .map_err(|e| Error::Plan(format!("tag_keys error: {}", e))) })