refactor: concurrent table scan for "tag values" (#5671)

Ref #5668.
pull/24376/head
Marco Neumann 2022-09-19 16:11:51 +02:00 committed by GitHub
parent 274bd80ecd
commit 7e00426d49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 123 additions and 117 deletions

View File

@ -31,11 +31,7 @@ use query_functions::{
};
use schema::{selection::Selection, InfluxColumnType, Schema, TIME_COLUMN_NAME};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{
cmp::Reverse,
collections::{BTreeMap, BTreeSet},
sync::Arc,
};
use std::{cmp::Reverse, collections::BTreeSet, sync::Arc};
const CONCURRENT_TABLE_JOBS: usize = 10;
@ -458,7 +454,7 @@ impl InfluxRpcPlanner {
/// the conditions specified by `predicate`.
pub async fn tag_values(
&self,
database: &dyn QueryDatabase,
database: Arc<dyn QueryDatabase>,
tag_name: &str,
rpc_predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan> {
@ -473,107 +469,122 @@ impl InfluxRpcPlanner {
// distinct values that can be found from only metadata and
// which need full plans
// Key is table name, value is set of chunks which had data
// for that table but that we couldn't evaluate the predicate
// entirely using the metadata
let mut need_full_plans = BTreeMap::new();
let mut known_values = BTreeSet::new();
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.context(CreatingPredicatesSnafu)?;
for (table_name, predicate) in &table_predicates {
let chunks = database
.chunks(table_name, predicate, ctx.child_ctx("table chunks"))
.await
.context(GettingChunksSnafu { table_name })?;
for chunk in cheap_chunk_first(chunks) {
// If there are delete predicates, we need to scan (or do full plan) the data to eliminate
// deleted data before getting tag values
let mut do_full_plan = chunk.has_delete_predicates();
// Try and apply the predicate using only metadata
let pred_result = chunk.apply_predicate_to_metadata(predicate).context(
CheckingChunkPredicateSnafu {
chunk_id: chunk.id(),
},
)?;
// filter out tables that do NOT contain `tag_name` early, esp. before performing any chunk scan (which includes
// ingester RPC)
let mut table_predicates_filtered = Vec::with_capacity(table_predicates.len());
for (table_name, predicate) in table_predicates {
let schema = database
.table_schema(&table_name)
.context(TableRemovedSnafu {
table_name: &table_name,
})?;
if matches!(pred_result, PredicateMatch::Zero) {
continue;
}
// Skip this table if the tag_name is not a column in this table
if schema.find_index_of(tag_name).is_none() {
continue;
};
// use schema to validate column type
let schema = chunk.schema();
table_predicates_filtered.push((table_name, predicate));
}
// Skip this table if the tag_name is not a column in this table
let idx = if let Some(idx) = schema.find_index_of(tag_name) {
idx
} else {
continue;
};
let tables: Vec<_> =
table_chunk_stream(Arc::clone(&database), &table_predicates_filtered, &ctx)
.and_then(|(table_name, predicate, chunks)| async move {
let mut chunks_full = vec![];
let mut known_values = BTreeSet::new();
// Validate that this really is a Tag column
let (influx_column_type, field) = schema.field(idx);
ensure!(
matches!(influx_column_type, Some(InfluxColumnType::Tag)),
InvalidTagColumnSnafu {
tag_name,
influx_column_type,
}
);
ensure!(
influx_column_type
.unwrap()
.valid_arrow_type(field.data_type()),
InternalInvalidTagTypeSnafu {
tag_name,
data_type: field.data_type().clone(),
}
);
for chunk in cheap_chunk_first(chunks) {
// Try and apply the predicate using only metadata
let pred_result = chunk.apply_predicate_to_metadata(predicate).context(
CheckingChunkPredicateSnafu {
chunk_id: chunk.id(),
},
)?;
if !do_full_plan {
// try and get the list of values directly from metadata
let maybe_values = chunk
.column_values(
self.ctx.child_ctx("tag_values execution"),
tag_name,
predicate,
)
.context(FindingColumnValuesSnafu)?;
if matches!(pred_result, PredicateMatch::Zero) {
continue;
}
match maybe_values {
Some(mut names) => {
// use schema to validate column type
let schema = chunk.schema();
// Skip this table if the tag_name is not a column in this chunk
// Note: This may happen even when the table contains the tag_name, because some chunks may not
// contain all columns.
let idx = if let Some(idx) = schema.find_index_of(tag_name) {
idx
} else {
continue;
};
// Validate that this really is a Tag column
let (influx_column_type, field) = schema.field(idx);
ensure!(
matches!(influx_column_type, Some(InfluxColumnType::Tag)),
InvalidTagColumnSnafu {
tag_name,
influx_column_type,
}
);
ensure!(
influx_column_type
.unwrap()
.valid_arrow_type(field.data_type()),
InternalInvalidTagTypeSnafu {
tag_name,
data_type: field.data_type().clone(),
}
);
// If there are delete predicates, we need to scan (or do full plan) the data to eliminate
// deleted data before getting tag values
if chunk.has_delete_predicates() {
debug!(
%table_name,
names=?names,
chunk_id=%chunk.id().get(),
"tag values found from metadata",
"need full plan to find tag values"
);
known_values.append(&mut names);
}
None => {
do_full_plan = true;
chunks_full.push(chunk);
} else {
// try and get the list of values directly from metadata
let mut ctx = self.ctx.child_ctx("tag_values execution");
ctx.set_metadata("table", table_name.to_owned());
let maybe_values = chunk
.column_values(ctx, tag_name, predicate)
.context(FindingColumnValuesSnafu)?;
match maybe_values {
Some(mut names) => {
debug!(
%table_name,
names=?names,
chunk_id=%chunk.id().get(),
"tag values found from metadata",
);
known_values.append(&mut names);
}
None => {
debug!(
%table_name,
chunk_id=%chunk.id().get(),
"need full plan to find tag values"
);
chunks_full.push(chunk);
}
}
}
}
}
// can't get columns only from metadata, need
// a general purpose plan
if do_full_plan {
debug!(
%table_name,
chunk_id=%chunk.id().get(),
"need full plan to find tag values"
);
need_full_plans
.entry(table_name)
.or_insert_with(Vec::new)
.push(Arc::clone(&chunk));
}
}
}
Ok((table_name, predicate, chunks_full, known_values))
})
.try_collect()
.await?;
let mut builder = StringSetPlanBuilder::new();
@ -582,17 +593,21 @@ impl InfluxRpcPlanner {
// At this point, we have a set of tag_values we know at plan
// time in `known_columns`, and some tables in chunks that we
// need to run a plan to find what values pass the predicate.
for (table_name, predicate) in &table_predicates {
if let Some(chunks) = need_full_plans.remove(table_name) {
for (table_name, predicate, chunks_full, known_values) in tables {
builder = builder.append_other(known_values.into());
if !chunks_full.is_empty() {
let schema = database
.table_schema(table_name)
.context(TableRemovedSnafu { table_name })?;
let scan_and_filter =
ScanPlanBuilder::new(schema, ctx.child_ctx("scan_and_filter planning"))
.with_chunks(chunks)
.with_predicate(predicate)
.build()?;
let mut ctx = ctx.child_ctx("scan_and_filter planning");
ctx.set_metadata("table", table_name.to_owned());
let scan_and_filter = ScanPlanBuilder::new(schema, ctx)
.with_chunks(chunks_full)
.with_predicate(predicate)
.build()?;
let tag_name_is_not_null = Expr::Column(tag_name.into()).is_not_null();
@ -619,11 +634,7 @@ impl InfluxRpcPlanner {
}
}
// add the known values we could find from metadata only
builder
.append_other(known_values.into())
.build()
.context(CreatingStringSetSnafu)
builder.build().context(CreatingStringSetSnafu)
}
/// Returns a plan that produces a list of columns and their
@ -1792,7 +1803,7 @@ mod tests {
run_test(|test_db, rpc_predicate| {
async move {
InfluxRpcPlanner::new(IOxSessionContext::with_testing())
.tag_values(test_db.as_ref(), "foo", rpc_predicate)
.tag_values(test_db, "foo", rpc_predicate)
.await
.expect("creating plan");
}

View File

@ -13,11 +13,6 @@ pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryDatabase
/// This is required due to <https://github.com/rust-lang/rust/issues/65991>.
fn as_catalog_provider_arc(self: Arc<Self>) -> Arc<dyn CatalogProvider>;
/// Upcast to [`QueryDatabase`].
///
/// This is required due to <https://github.com/rust-lang/rust/issues/65991>.
fn as_query_database(&self) -> &dyn QueryDatabase;
/// Upcast to [`QueryDatabase`].
///
/// This is required due to <https://github.com/rust-lang/rust/issues/65991>.
@ -33,10 +28,6 @@ impl AbstractDb for QuerierNamespace {
self as _
}
fn as_query_database(&self) -> &dyn QueryDatabase {
self
}
fn as_query_database_arc(self: Arc<Self>) -> Arc<dyn QueryDatabase> {
self
}

View File

@ -30,7 +30,7 @@ async fn run_tag_values_test_case<D>(
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.tag_values(db.as_query_database(), tag_name, predicate.clone())
.tag_values(db.as_query_database_arc(), tag_name, predicate.clone())
.await
.expect("built plan successfully");
let names = ctx
@ -318,7 +318,7 @@ async fn list_tag_values_field_col_on_tag() {
let tag_name = "temp";
let plan_result = planner
.tag_values(
db.as_query_database(),
db.as_query_database_arc(),
tag_name,
InfluxRpcPredicate::default(),
)

View File

@ -104,7 +104,7 @@ impl Planner {
self.ctx
.run(async move {
planner
.tag_values(database.as_ref(), &tag_name, predicate)
.tag_values(database, &tag_name, predicate)
.await
.map_err(|e| Error::Plan(format!("tag_values error: {}", e)))
})

View File

@ -2120,7 +2120,9 @@ mod tests {
let db_info = org_and_bucket();
let chunk = TestChunk::new("my_table").with_error("Sugar we are going down");
let chunk = TestChunk::new("my_table")
.with_tag_column("the_tag_key")
.with_error("Sugar we are going down");
fixture
.test_storage
@ -2488,7 +2490,9 @@ mod tests {
let db_info = org_and_bucket();
let chunk = TestChunk::new("m5").with_error("Sugar we are going down");
let chunk = TestChunk::new("m5")
.with_tag_column("the_tag_key")
.with_error("Sugar we are going down");
fixture
.test_storage