refactor: concurrent table scan for "tag keys" (#5670)

* refactor: concurrent table scan for "tag keys"

Ref #5668.

* feat: add table name to context metadata
pull/24376/head
Marco Neumann 2022-09-19 15:27:18 +02:00 committed by GitHub
parent ef09573255
commit 274bd80ecd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 122 additions and 123 deletions

View File

@ -286,13 +286,10 @@ impl InfluxRpcPlanner {
.table_schema(table_name)
.context(TableRemovedSnafu { table_name })?;
let plan = Self::table_name_plan(
ctx.child_ctx("table name plan"),
table_name,
schema,
predicate,
chunks,
)?;
let mut ctx = ctx.child_ctx("table name plan");
ctx.set_metadata("table", table_name.to_owned());
let plan = Self::table_name_plan(ctx, table_name, schema, predicate, chunks)?;
builder = builder.append_other(plan.into());
}
}
@ -307,7 +304,7 @@ impl InfluxRpcPlanner {
/// conditions specified by `predicate`.
pub async fn tag_keys(
&self,
database: &dyn QueryDatabase,
database: Arc<dyn QueryDatabase>,
rpc_predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan> {
let ctx = self.ctx.child_ctx("tag_keys planning");
@ -323,139 +320,137 @@ impl InfluxRpcPlanner {
// 2. For each table/chunk pair, figure out which can be found
// from only metadata and which need full plans
// Key is table name, value is set of chunks which had data
// for that table but that we couldn't evaluate the predicate
// entirely using the metadata
let mut need_full_plans = BTreeMap::new();
let mut known_columns = BTreeSet::new();
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.context(CreatingPredicatesSnafu)?;
for (table_name, predicate) in &table_predicates {
let mut table_predicates_need_chunks = vec![];
let mut builder = StringSetPlanBuilder::new();
for (table_name, predicate) in table_predicates {
if predicate.is_empty() {
// special case - return the columns from metadata only.
// Note that columns with all rows deleted will still show here
known_columns.extend(
builder = builder.append_other(
database
.table_schema(table_name)
.table_schema(&table_name)
.context(TableRemovedSnafu { table_name })?
.tags_iter()
.map(|f| f.name().clone()),
.map(|f| f.name().clone())
.collect::<BTreeSet<_>>()
.into(),
);
continue;
}
let chunks = database
.chunks(table_name, predicate, ctx.child_ctx("table chunks"))
.await
.context(GettingChunksSnafu { table_name })?;
for chunk in cheap_chunk_first(chunks) {
// If there are delete predicates, we need to scan (or do full plan) the data to eliminate
// deleted data before getting tag keys
let mut do_full_plan = chunk.has_delete_predicates();
// Try and apply the predicate using only metadata
let pred_result = chunk.apply_predicate_to_metadata(predicate).context(
CheckingChunkPredicateSnafu {
chunk_id: chunk.id(),
},
)?;
if matches!(pred_result, PredicateMatch::Zero) {
continue;
}
// get only tag columns from metadata
let schema = chunk.schema();
let column_names: Vec<&str> = schema
.tags_iter()
.map(|f| f.name().as_str())
.collect::<Vec<&str>>();
let selection = Selection::Some(&column_names);
if !do_full_plan {
// filter the columns further from the predicate
let maybe_names = chunk
.column_names(
ctx.child_ctx("column_names execution"),
predicate,
selection,
)
.context(FindingColumnNamesSnafu)?;
match maybe_names {
Some(mut names) => {
debug!(
%table_name,
names=?names,
chunk_id=%chunk.id().get(),
"column names found from metadata",
);
known_columns.append(&mut names);
}
None => {
do_full_plan = true;
}
}
}
// can't get columns only from metadata, need
// a general purpose plan
if do_full_plan {
debug!(
%table_name,
chunk_id=%chunk.id().get(),
"column names need full plan"
);
need_full_plans
.entry(table_name)
.or_insert_with(Vec::new)
.push(Arc::clone(&chunk));
}
} else {
table_predicates_need_chunks.push((table_name, predicate));
}
}
let mut builder = StringSetPlanBuilder::new();
let tables: Vec<_> =
table_chunk_stream(Arc::clone(&database), &table_predicates_need_chunks, &ctx)
.and_then(|(table_name, predicate, chunks)| {
let mut ctx = ctx.child_ctx("table");
ctx.set_metadata("table", table_name.to_owned());
async move {
let mut chunks_full = vec![];
let mut known_columns = BTreeSet::new();
for chunk in cheap_chunk_first(chunks) {
// Try and apply the predicate using only metadata
let pred_result = chunk
.apply_predicate_to_metadata(predicate)
.context(CheckingChunkPredicateSnafu {
chunk_id: chunk.id(),
})?;
if matches!(pred_result, PredicateMatch::Zero) {
continue;
}
// get only tag columns from metadata
let schema = chunk.schema();
let column_names: Vec<&str> = schema
.tags_iter()
.map(|f| f.name().as_str())
.collect::<Vec<&str>>();
let selection = Selection::Some(&column_names);
// If there are delete predicates, we need to scan (or do full plan) the data to eliminate
// deleted data before getting tag keys
if chunk.has_delete_predicates() {
debug!(
%table_name,
chunk_id=%chunk.id().get(),
"column names need full plan"
);
chunks_full.push(chunk);
} else {
// filter the columns further from the predicate
let maybe_names = chunk
.column_names(
ctx.child_ctx("column_names execution"),
predicate,
selection,
)
.context(FindingColumnNamesSnafu)?;
match maybe_names {
Some(mut names) => {
debug!(
%table_name,
names=?names,
chunk_id=%chunk.id().get(),
"column names found from metadata",
);
known_columns.append(&mut names);
}
None => {
debug!(
%table_name,
chunk_id=%chunk.id().get(),
"column names need full plan"
);
chunks_full.push(chunk);
}
}
}
}
Ok((table_name, predicate, chunks_full, known_columns))
}
})
.try_collect()
.await?;
// At this point, we have a set of column names we know pass
// in `known_columns`, and potentially some tables in chunks
// that we need to run a plan to know if they pass the
// predicate.
if !need_full_plans.is_empty() {
// TODO an additional optimization here would be to filter
// out chunks (and tables) where all columns in that chunk
// were already known to have data (based on the contents of known_columns)
for (table_name, predicate, chunks_full, known_columns) in tables {
builder = builder.append_other(known_columns.into());
for (table_name, predicate) in &table_predicates {
if let Some(chunks) = need_full_plans.remove(table_name) {
let schema = database
.table_schema(table_name)
.context(TableRemovedSnafu { table_name })?;
if !chunks_full.is_empty() {
// TODO an additional optimization here would be to filter
// out chunks (and tables) where all columns in that chunk
// were already known to have data (based on the contents of known_columns)
let plan = self.tag_keys_plan(
ctx.child_ctx("tag_keys_plan"),
table_name,
schema,
predicate,
chunks,
)?;
let schema = database
.table_schema(table_name)
.context(TableRemovedSnafu { table_name })?;
if let Some(plan) = plan {
builder = builder.append_other(plan)
}
let mut ctx = ctx.child_ctx("tag_keys_plan");
ctx.set_metadata("table", table_name.to_owned());
let plan = self.tag_keys_plan(ctx, table_name, schema, predicate, chunks_full)?;
if let Some(plan) = plan {
builder = builder.append_other(plan)
}
}
}
// add the known columns we could find from metadata only
builder
.append_other(known_columns.into())
.build()
.context(CreatingStringSetSnafu)
builder.build().context(CreatingStringSetSnafu)
}
/// Returns a plan which finds the distinct, non-null tag values
@ -1312,12 +1307,14 @@ fn table_chunk_stream<'a>(
) -> impl Stream<Item = Result<(&'a str, &'a Predicate, Vec<Arc<dyn QueryChunk>>)>> + 'a {
futures::stream::iter(table_predicates)
.map(move |(table_name, predicate)| {
let ctx = ctx.child_ctx("table chunks");
let mut ctx = ctx.child_ctx("table");
ctx.set_metadata("table", table_name.clone());
let database = Arc::clone(&database);
async move {
let chunks = database
.chunks(table_name, predicate, ctx)
.chunks(table_name, predicate, ctx.child_ctx("table chunks"))
.await
.context(GettingChunksSnafu { table_name })?;
@ -1367,7 +1364,9 @@ where
as Result<Option<(&str, &Predicate, Vec<_>)>>
})
.and_then(|(table_name, predicate, chunks)| {
let ctx = &ctx;
let mut ctx = ctx.child_ctx("table");
ctx.set_metadata("table", table_name.to_owned());
let database = Arc::clone(&database);
let f = f.clone();
@ -1376,7 +1375,7 @@ where
.table_schema(table_name)
.context(TableRemovedSnafu { table_name })?;
f(ctx, table_name, predicate, chunks, schema)
f(&ctx, table_name, predicate, chunks, schema)
}
})
.try_collect()
@ -1779,7 +1778,7 @@ mod tests {
run_test(|test_db, rpc_predicate| {
async move {
InfluxRpcPlanner::new(IOxSessionContext::with_testing())
.tag_keys(test_db.as_ref(), rpc_predicate)
.tag_keys(test_db, rpc_predicate)
.await
.expect("creating plan");
}

View File

@ -31,7 +31,7 @@ async fn run_tag_keys_test_case<D>(
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.tag_keys(db.as_query_database(), predicate.clone())
.tag_keys(db.as_query_database_arc(), predicate.clone())
.await
.expect("built plan successfully");
let names = ctx

View File

@ -80,7 +80,7 @@ impl Planner {
self.ctx
.run(async move {
planner
.tag_keys(database.as_ref(), predicate)
.tag_keys(database, predicate)
.await
.map_err(|e| Error::Plan(format!("tag_keys error: {}", e)))
})