diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index b0f46a5654..f7258c5a28 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -120,13 +120,6 @@ impl Chunk { Ok(()) } - // Add the table name in this chunk to `names` if it is not already present - pub fn all_table_names(&self, names: &mut BTreeSet) { - if !names.contains(self.table_name.as_ref()) { - names.insert(self.table_name.to_string()); - } - } - /// Returns a queryable snapshot of this chunk #[cfg(not(feature = "nocache"))] pub fn snapshot(&self) -> Arc { diff --git a/mutable_buffer/src/chunk/snapshot.rs b/mutable_buffer/src/chunk/snapshot.rs index ede48a42ab..85ae359430 100644 --- a/mutable_buffer/src/chunk/snapshot.rs +++ b/mutable_buffer/src/chunk/snapshot.rs @@ -72,11 +72,6 @@ impl ChunkSnapshot { self.batch.num_rows() == 0 } - /// Return true if this snapshot has the specified table name - pub fn has_table(&self, table_name: &str) -> bool { - self.table_name.as_ref() == table_name - } - /// Return Schema for the specified table / columns pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result { // Temporary #1295 @@ -102,7 +97,7 @@ impl ChunkSnapshot { /// Returns a list of tables with writes matching the given timestamp_range pub fn table_names(&self, timestamp_range: Option) -> BTreeSet { let mut ret = BTreeSet::new(); - if self.matches_predicate(×tamp_range) { + if self.has_timerange(×tamp_range) { ret.insert(self.table_name.to_string()); } ret @@ -222,7 +217,7 @@ impl ChunkSnapshot { self.batch.num_rows() } - fn matches_predicate(&self, timestamp_range: &Option) -> bool { + pub fn has_timerange(&self, timestamp_range: &Option) -> bool { let timestamp_range = match timestamp_range { Some(t) => t, None => return true, diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index fcceaf876c..f738c738b4 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -134,14 +134,9 @@ impl Chunk { self.table.full_schema() } - // Return all tables of this chunk whose timestamp overlaps with the give one - pub fn table_names( - &self, - timestamp_range: Option, - ) -> impl Iterator + '_ { - std::iter::once(&self.table) - .filter(move |table| table.matches_predicate(×tamp_range)) - .map(|table| table.name().to_string()) + // Return true if the table in this chunk contains values within the time range + pub fn has_timerange(&self, timestamp_range: Option<&TimestampRange>) -> bool { + self.table.matches_predicate(timestamp_range) } // Return the columns names that belong to the given column diff --git a/parquet_file/src/table.rs b/parquet_file/src/table.rs index 36c6a2a094..28276a951b 100644 --- a/parquet_file/src/table.rs +++ b/parquet_file/src/table.rs @@ -110,7 +110,7 @@ impl Table { } // Check if 2 time ranges overlap - pub fn matches_predicate(&self, timestamp_range: &Option) -> bool { + pub fn matches_predicate(&self, timestamp_range: Option<&TimestampRange>) -> bool { match (self.timestamp_range, timestamp_range) { (Some(a), Some(b)) => !a.disjoint(b), (None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */ diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 96affa1e98..3e0f7e3063 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -19,7 +19,7 @@ use observability_deps::tracing::debug; use snafu::{ensure, ResultExt, Snafu}; use crate::{ - exec::{field::FieldColumns, make_schema_pivot, stringset::StringSet}, + exec::{field::FieldColumns, make_schema_pivot}, func::{ selectors::{selector_first, selector_last, selector_max, selector_min, SelectorOutput}, window::make_window_bound_expr, @@ -30,7 +30,7 @@ use crate::{ seriesset::{SeriesSetPlan, SeriesSetPlans}, stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder}, }, - predicate::{Predicate, PredicateBuilder}, + predicate::{Predicate, PredicateMatch}, provider::ProviderBuilder, util::schema_has_all_expr_columns, Database, PartitionChunk, @@ -196,21 +196,31 @@ impl InfluxRpcPlanner { let mut builder = StringSetPlanBuilder::new(); for chunk in database.chunks(&predicate) { - let new_table_names = chunk - .table_names(&predicate, builder.known_strings()) + // Try and apply the predicate using only metadata + let pred_result = chunk + .apply_predicate(&predicate) .map_err(|e| Box::new(e) as _) - .context(TableNamePlan)?; + .context(CheckingChunkPredicate { + chunk_id: chunk.id(), + })?; - builder = match new_table_names { - Some(new_table_names) => builder.append(new_table_names.into()), - None => { + builder = match pred_result { + PredicateMatch::AtLeastOne => builder.append_table(chunk.table_name()), + // no match, ignore table + PredicateMatch::Zero => builder, + // can't evaluate predicate, need a new plan + PredicateMatch::Unknown => { // TODO: General purpose plans for - // table_names. For now, if we couldn't figure out - // the table names from only metadata, generate an - // error + // table_names. For now, return an error + debug!( + chunk = chunk.id(), + ?predicate, + table_name = chunk.table_name(), + "can not evaluate predicate" + ); return UnsupportedPredicateForTableNames { predicate }.fail(); } - } + }; } let plan = builder.build().context(CreatingStringSet)?; @@ -241,51 +251,60 @@ impl InfluxRpcPlanner { let mut known_columns = BTreeSet::new(); for chunk in database.chunks(&predicate) { - // try and get the table names that have rows that match the predicate - let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?; + // Try and apply the predicate using only metadata + let pred_result = chunk + .apply_predicate(&predicate) + .map_err(|e| Box::new(e) as _) + .context(CheckingChunkPredicate { + chunk_id: chunk.id(), + })?; - for table_name in table_names { - debug!( - table_name = table_name.as_str(), - chunk_id = chunk.id(), - "finding columns in table" - ); + if matches!(pred_result, PredicateMatch::Zero) { + continue; + } + let table_name = chunk.table_name(); + let chunk_id = chunk.id(); + debug!(table_name, chunk_id, "finding columns in table"); - // get only tag columns from metadata - let schema = chunk - .table_schema(Selection::All) - .expect("to be able to get table schema"); - let column_names: Vec<&str> = schema - .tags_iter() - .map(|f| f.name().as_str()) - .collect::>(); + // get only tag columns from metadata + let schema = chunk + .table_schema(Selection::All) + .map_err(|e| Box::new(e) as _) + .context(GettingTableSchema { + table_name, + chunk_id, + })?; - let selection = Selection::Some(&column_names); + let column_names: Vec<&str> = schema + .tags_iter() + .map(|f| f.name().as_str()) + .collect::>(); - // filter the columns further from the predicate - let maybe_names = chunk - .column_names(&predicate, selection) - .map_err(|e| Box::new(e) as _) - .context(FindingColumnNames)?; + let selection = Selection::Some(&column_names); - match maybe_names { - Some(mut names) => { - debug!(names=?names, chunk_id = chunk.id(), "column names found from metadata"); - known_columns.append(&mut names); - } - None => { - debug!( - table_name = table_name.as_str(), - chunk_id = chunk.id(), - "column names need full plan" - ); - // can't get columns only from metadata, need - // a general purpose plan - need_full_plans - .entry(table_name) - .or_insert_with(Vec::new) - .push(Arc::clone(&chunk)); - } + // filter the columns further from the predicate + let maybe_names = chunk + .column_names(&predicate, selection) + .map_err(|e| Box::new(e) as _) + .context(FindingColumnNames)?; + + match maybe_names { + Some(mut names) => { + debug!(names=?names, chunk_id = chunk.id(), "column names found from metadata"); + known_columns.append(&mut names); + } + None => { + debug!( + table_name, + chunk_id = chunk.id(), + "column names need full plan" + ); + // can't get columns only from metadata, need + // a general purpose plan + need_full_plans + .entry(table_name.to_string()) + .or_insert_with(Vec::new) + .push(Arc::clone(&chunk)); } } } @@ -346,70 +365,79 @@ impl InfluxRpcPlanner { let mut known_values = BTreeSet::new(); for chunk in database.chunks(&predicate) { - let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?; + // Try and apply the predicate using only metadata + let pred_result = chunk + .apply_predicate(&predicate) + .map_err(|e| Box::new(e) as _) + .context(CheckingChunkPredicate { + chunk_id: chunk.id(), + })?; - for table_name in table_names { - debug!( - table_name = table_name.as_str(), - chunk_id = chunk.id(), - "finding columns in table" - ); + if matches!(pred_result, PredicateMatch::Zero) { + continue; + } + let table_name = chunk.table_name(); + let chunk_id = chunk.id(); + debug!(table_name, chunk_id, "finding columns in table"); - // use schema to validate column type - let schema = chunk - .table_schema(Selection::All) - .expect("to be able to get table schema"); + // use schema to validate column type + let schema = chunk + .table_schema(Selection::All) + .map_err(|e| Box::new(e) as _) + .context(GettingTableSchema { + table_name, + chunk_id, + })?; - // Skip this table if the tag_name is not a column in this table - let idx = if let Some(idx) = schema.find_index_of(tag_name) { - idx - } else { - continue; - }; + // Skip this table if the tag_name is not a column in this table + let idx = if let Some(idx) = schema.find_index_of(tag_name) { + idx + } else { + continue; + }; - // Validate that this really is a Tag column - let (influx_column_type, field) = schema.field(idx); - ensure!( - matches!(influx_column_type, Some(InfluxColumnType::Tag)), - InvalidTagColumn { - tag_name, - influx_column_type, - } - ); - ensure!( - influx_column_type - .unwrap() - .valid_arrow_type(field.data_type()), - InternalInvalidTagType { - tag_name, - data_type: field.data_type().clone(), - } - ); + // Validate that this really is a Tag column + let (influx_column_type, field) = schema.field(idx); + ensure!( + matches!(influx_column_type, Some(InfluxColumnType::Tag)), + InvalidTagColumn { + tag_name, + influx_column_type, + } + ); + ensure!( + influx_column_type + .unwrap() + .valid_arrow_type(field.data_type()), + InternalInvalidTagType { + tag_name, + data_type: field.data_type().clone(), + } + ); - // try and get the list of values directly from metadata - let maybe_values = chunk - .column_values(tag_name, &predicate) - .map_err(|e| Box::new(e) as _) - .context(FindingColumnValues)?; + // try and get the list of values directly from metadata + let maybe_values = chunk + .column_values(tag_name, &predicate) + .map_err(|e| Box::new(e) as _) + .context(FindingColumnValues)?; - match maybe_values { - Some(mut names) => { - debug!(names=?names, chunk_id = chunk.id(), "column values found from metadata"); - known_values.append(&mut names); - } - None => { - debug!( - table_name = table_name.as_str(), - chunk_id = chunk.id(), - "need full plan to find column values" - ); - // can't get columns only from metadata, need - // a general purpose plan - need_full_plans - .entry(table_name) - .or_insert_with(Vec::new) - .push(Arc::clone(&chunk)); - } + match maybe_values { + Some(mut names) => { + debug!(names=?names, chunk_id = chunk.id(), "column values found from metadata"); + known_values.append(&mut names); + } + None => { + debug!( + table_name, + chunk_id = chunk.id(), + "need full plan to find column values" + ); + // can't get columns only from metadata, need + // a general purpose plan + need_full_plans + .entry(table_name.to_string()) + .or_insert_with(Vec::new) + .push(Arc::clone(&chunk)); } } } @@ -609,7 +637,7 @@ impl InfluxRpcPlanner { Ok(ss_plans.into()) } - /// Creates a map of table_name --> Chunks that have that table + /// Creates a map of table_name --> Chunks that have that table that *may* pass the predicate fn group_chunks_by_table( &self, predicate: &Predicate, @@ -620,56 +648,32 @@ impl InfluxRpcPlanner { { let mut table_chunks = BTreeMap::new(); for chunk in chunks { - let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?; - for table_name in table_names { - table_chunks - .entry(table_name) - .or_insert_with(Vec::new) - .push(Arc::clone(&chunk)); + // Try and apply the predicate using only metadata + let pred_result = chunk + .apply_predicate(&predicate) + .map_err(|e| Box::new(e) as _) + .context(CheckingChunkPredicate { + chunk_id: chunk.id(), + })?; + + match pred_result { + PredicateMatch::AtLeastOne | + // have to include chunk as we can't rule it out + PredicateMatch::Unknown => { + let table_name = chunk.table_name().to_string(); + table_chunks + .entry(table_name) + .or_insert_with(Vec::new) + .push(Arc::clone(&chunk)); + } + // Skip chunk here based on metadata + PredicateMatch::Zero => { + } } } Ok(table_chunks) } - /// Find all the table names in the specified chunk that pass the predicate - fn chunk_table_names(&self, chunk: &C, predicate: &Predicate) -> Result> - where - C: PartitionChunk + 'static, - { - let no_tables = StringSet::new(); - - // try and get the table names that have rows that match the predicate - let table_names = chunk - .table_names(&predicate, &no_tables) - .map_err(|e| Box::new(e) as _) - .context(TableNamePlan)?; - - debug!(table_names=?table_names, chunk_id = chunk.id(), "chunk tables"); - - let table_names = match table_names { - Some(table_names) => { - debug!("found table names with original predicate"); - table_names - } - None => { - // couldn't find table names with predicate, get all chunk tables, - // fall back to filtering ourself - let table_name_predicate = if let Some(table_names) = &predicate.table_names { - PredicateBuilder::new().tables(table_names).build() - } else { - Predicate::default() - }; - chunk - .table_names(&table_name_predicate, &no_tables) - .map_err(|e| Box::new(e) as _) - .context(InternalTableNamePlanForDefault)? - // unwrap the Option (table didn't match) - .unwrap_or(no_tables) - } - }; - Ok(table_names) - } - /// Creates a DataFusion LogicalPlan that returns column *names* as a /// single column of Strings for a specific table /// @@ -1109,11 +1113,11 @@ impl InfluxRpcPlanner { let chunk_id = chunk.id(); // check that it is consistent with this table_name - assert!( - chunk.has_table(table_name), - "Chunk {} did not have table {}, while trying to make a plan for it", + assert_eq!( + chunk.table_name(), + table_name, + "Chunk {} expected table mismatch", chunk.id(), - table_name ); let chunk_table_schema = chunk diff --git a/query/src/lib.rs b/query/src/lib.rs index 2ff789ba87..0d1289cdaf 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -11,6 +11,7 @@ use data_types::chunk_metadata::ChunkSummary; use datafusion::physical_plan::SendableRecordBatchStream; use exec::{stringset::StringSet, Executor}; use internal_types::{schema::Schema, selection::Selection}; +use predicate::PredicateMatch; use std::{fmt::Debug, sync::Arc}; @@ -59,32 +60,16 @@ pub trait PartitionChunk: Debug + Send + Sync { /// particular partition. fn id(&self) -> u32; - /// Returns true if this chunk contains data for the specified table - fn has_table(&self, table_name: &str) -> bool; + /// Returns the name of the table stored in this chunk + fn table_name(&self) -> &str; - /// Returns all table names from this chunk that have at least one - /// row that matches the `predicate` and are not already in `known_tables`. + /// Returns the result of applying the `predicate` to the chunk + /// using an efficient, but inexact method, based on metadata. /// - /// If the predicate cannot be evaluated (e.g it has predicates - /// that cannot be directly evaluated in the chunk), `None` is - /// returned. - /// - /// `known_tables` is a list of table names already known to be in - /// other chunks from the same partition. It may be empty or - /// contain `table_names` not in this chunk. - fn table_names( - &self, - predicate: &Predicate, - known_tables: &StringSet, - ) -> Result, Self::Error>; - - /// Adds all table names from this chunk without any predicate to - /// `known_tables) - /// - /// `known_tables` is a list of table names already known to be in - /// other chunks from the same partition. It may be empty or - /// contain `table_names` not in this chunk. - fn all_table_names(&self, known_tables: &mut StringSet); + /// NOTE: This method is suitable for calling during planning, and + /// may return PredicateMatch::Unknown for certain types of + /// predicates. + fn apply_predicate(&self, predicate: &Predicate) -> Result; /// Returns a set of Strings with column names from the specified /// table that have at least one row that matches `predicate`, if diff --git a/query/src/plan/stringset.rs b/query/src/plan/stringset.rs index 364d42cc01..f5194f8d48 100644 --- a/query/src/plan/stringset.rs +++ b/query/src/plan/stringset.rs @@ -101,6 +101,14 @@ impl StringSetPlanBuilder { &self.strings } + /// Append the name of a table to the known strings + pub fn append_table(mut self, table_name: &str) -> Self { + if !self.strings.contains(table_name) { + self.strings.insert(table_name.to_string()); + } + self + } + /// Append the strings from the passed plan into ourselves if possible, or /// passes on the plan pub fn append(mut self, other: StringSetPlan) -> Self { diff --git a/query/src/predicate.rs b/query/src/predicate.rs index dbf03047eb..45a86dc2f0 100644 --- a/query/src/predicate.rs +++ b/query/src/predicate.rs @@ -28,6 +28,20 @@ pub const EMPTY_PREDICATE: Predicate = Predicate { partition_key: None, }; +#[derive(Debug, Clone, Copy)] +/// The result of evaluating a predicate on a set of rows +pub enum PredicateMatch { + /// There is at least one row that matches the predicate + AtLeastOne, + + /// There are exactly zero rows that match the predicate + Zero, + + /// There *may* be rows that match, OR there *may* be no rows that + /// match + Unknown, +} + /// Represents a parsed predicate for evaluation by the /// TSDatabase InfluxDB IOx query engine. /// diff --git a/query/src/test.rs b/query/src/test.rs index 2b45c7da19..7486e352ad 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -14,7 +14,7 @@ use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBa use crate::exec::Executor; use crate::{ exec::stringset::{StringSet, StringSetRef}, - Database, DatabaseStore, PartitionChunk, Predicate, + Database, DatabaseStore, PartitionChunk, Predicate, PredicateMatch, }; use internal_types::{ @@ -133,6 +133,9 @@ pub struct TestChunk { /// A saved error that is returned instead of actual results saved_error: Option, + + /// Return value for apply_predicate, if desired + predicate_match: Option, } impl TestChunk { @@ -150,6 +153,12 @@ impl TestChunk { self } + /// specify that any call to apply_predicate should return this value + pub fn with_predicate_match(mut self, predicate_match: PredicateMatch) -> Self { + self.predicate_match = Some(predicate_match); + self + } + /// Checks the saved error, and returns it if any, otherwise returns OK fn check_error(&self) -> Result<()> { if let Some(message) = self.saved_error.as_ref() { @@ -307,6 +316,10 @@ impl PartitionChunk for TestChunk { self.id } + fn table_name(&self) -> &str { + self.table_name.as_deref().unwrap() + } + fn read_filter( &self, predicate: &Predicate, @@ -322,29 +335,31 @@ impl PartitionChunk for TestChunk { Ok(Box::pin(stream)) } - fn table_names( - &self, - predicate: &Predicate, - _known_tables: &StringSet, - ) -> Result, Self::Error> { + fn apply_predicate(&self, predicate: &Predicate) -> Result { self.check_error()?; // save the predicate self.predicates.lock().push(predicate.clone()); - // do basic filtering based on table name predicate. + // check if there is a saved result to return + if let Some(&predicate_match) = self.predicate_match.as_ref() { + return Ok(predicate_match); + } - Ok(self + // otherwise fall back to basic filtering based on table name predicate. + let predicate_match = self .table_name .as_ref() - .filter(|table_name| predicate.should_include_table(&table_name)) - .map(|table_name| std::iter::once(table_name.to_string()).collect::())) - } + .map(|table_name| { + if !predicate.should_include_table(&table_name) { + PredicateMatch::Zero + } else { + PredicateMatch::Unknown + } + }) + .unwrap_or(PredicateMatch::Unknown); - fn all_table_names(&self, known_tables: &mut StringSet) { - if let Some(table_name) = self.table_name.as_ref() { - known_tables.insert(table_name.to_string()); - } + Ok(predicate_match) } fn table_schema(&self, selection: Selection<'_>) -> Result { @@ -366,13 +381,6 @@ impl PartitionChunk for TestChunk { Ok(None) } - fn has_table(&self, table_name: &str) -> bool { - self.table_name - .as_ref() - .map(|n| n == table_name) - .unwrap_or(false) - } - fn column_names( &self, predicate: &Predicate, diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index c8c3e0543a..5d18d50f39 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -280,14 +280,7 @@ impl Chunk { chunk: Arc, metrics: ChunkMetrics, ) -> Self { - // workaround until https://github.com/influxdata/influxdb_iox/issues/1295 is fixed - let table_name = Arc::from( - chunk - .table_names(None) - .next() - .expect("chunk must have exactly 1 table") - .as_ref(), - ); + let table_name = Arc::from(chunk.table_name()); // Cache table summary + schema let meta = Arc::new(ChunkMetadata { diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index 8bacfecdfc..ae043a234c 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -133,11 +133,7 @@ impl Partition { chunk_id: u32, chunk: Arc, ) -> Result>> { - // workaround until https://github.com/influxdata/influxdb_iox/issues/1295 is fixed - let table_name = chunk - .table_names(None) - .next() - .expect("chunk must have exactly 1 table"); + let table_name = chunk.table_name().to_string(); let chunk = Arc::new(self.metrics.new_lock(Chunk::new_object_store_only( chunk_id, diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index ef73642749..feb2d35a34 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -12,7 +12,11 @@ use mutable_buffer::chunk::snapshot::ChunkSnapshot; use object_store::path::Path; use observability_deps::tracing::debug; use parquet_file::chunk::Chunk as ParquetChunk; -use query::{exec::stringset::StringSet, predicate::Predicate, PartitionChunk}; +use query::{ + exec::stringset::StringSet, + predicate::{Predicate, PredicateMatch}, + PartitionChunk, +}; use read_buffer::Chunk as ReadBufferChunk; use super::{ @@ -196,26 +200,35 @@ impl PartitionChunk for DbChunk { self.id } - fn all_table_names(&self, known_tables: &mut StringSet) { - // TODO remove this function (use name from TableSummary directly!) - let table_name = &self.meta.table_summary.name; - if !known_tables.contains(table_name) { - known_tables.insert(table_name.to_string()); - } + fn table_name(&self) -> &str { + self.table_name.as_ref() } - fn table_names( - &self, - predicate: &Predicate, - _known_tables: &StringSet, // TODO: Should this be being used? - ) -> Result, Self::Error> { - let names = match &self.state { + fn apply_predicate(&self, predicate: &Predicate) -> Result { + if !predicate.should_include_table(self.table_name().as_ref()) { + return Ok(PredicateMatch::Zero); + } + + // TODO apply predicate pruning here... + + let pred_result = match &self.state { State::MutableBuffer { chunk, .. } => { if predicate.has_exprs() { // TODO: Support more predicates - return Ok(None); + PredicateMatch::Unknown + } else if chunk.has_timerange(&predicate.range) { + // Note: this isn't precise / correct: if the + // chunk has the timerange, some other part of the + // predicate may rule out the rows, and thus + // without further work this clause should return + // "Unknown" rather than falsely claiming that + // there is at least one row: + // + // https://github.com/influxdata/influxdb_iox/issues/1590 + PredicateMatch::AtLeastOne + } else { + PredicateMatch::Zero } - chunk.table_names(predicate.range) } State::ReadBuffer { chunk, .. } => { // If not supported, ReadBuffer can't answer with @@ -224,23 +237,35 @@ impl PartitionChunk for DbChunk { Ok(rb_predicate) => rb_predicate, Err(e) => { debug!(?predicate, %e, "read buffer predicate not supported for table_names, falling back"); - return Ok(None); + return Ok(PredicateMatch::Unknown); } }; - chunk.table_names(&rb_predicate, &BTreeSet::new()) + // TODO align API in read_buffer + let table_names = chunk.table_names(&rb_predicate, &BTreeSet::new()); + if !table_names.is_empty() { + // As above, this should really be "Unknown" rather than AtLeastOne + // for precision / correctness. + PredicateMatch::AtLeastOne + } else { + PredicateMatch::Zero + } + } + State::ParquetFile { chunk, .. } => { + if predicate.has_exprs() { + // TODO: Support more predicates + PredicateMatch::Unknown + } else if chunk.has_timerange(predicate.range.as_ref()) { + // As above, this should really be "Unknown" rather than AtLeastOne + // for precision / correctness. + PredicateMatch::AtLeastOne + } else { + PredicateMatch::Zero + } } - State::ParquetFile { chunk, .. } => chunk.table_names(predicate.range).collect(), }; - // Prune out tables that should not be - // present (based on additional table restrictions of the Predicate) - Ok(Some( - names - .into_iter() - .filter(|table_name| predicate.should_include_table(table_name)) - .collect(), - )) + Ok(pred_result) } fn table_schema(&self, selection: Selection<'_>) -> Result { @@ -253,10 +278,6 @@ impl PartitionChunk for DbChunk { }) } - fn has_table(&self, table_name: &str) -> bool { - table_name == self.meta.table_summary.name - } - fn read_filter( &self, predicate: &Predicate, diff --git a/server/src/query_tests/table_schema.rs b/server/src/query_tests/table_schema.rs index 9ff765e171..fee6487b64 100644 --- a/server/src/query_tests/table_schema.rs +++ b/server/src/query_tests/table_schema.rs @@ -36,7 +36,7 @@ macro_rules! run_table_schema_test_case { let predicate = PredicateBuilder::new().table(table_name).build(); for chunk in db.chunks(&predicate) { - if chunk.has_table(table_name) { + if chunk.table_name().as_ref() == table_name { chunks_with_table += 1; let actual_schema = chunk.table_schema(selection.clone()).unwrap(); diff --git a/src/influxdb_ioxd/rpc/storage/service.rs b/src/influxdb_ioxd/rpc/storage/service.rs index 06f3f973ae..f8a58fcfd9 100644 --- a/src/influxdb_ioxd/rpc/storage/service.rs +++ b/src/influxdb_ioxd/rpc/storage/service.rs @@ -1171,7 +1171,7 @@ mod tests { use super::*; use datafusion::logical_plan::{col, lit, Expr}; use panic_logging::SendPanicsToTracing; - use query::{test::TestChunk, test::TestDatabaseStore}; + use query::{predicate::PredicateMatch, test::TestChunk, test::TestDatabaseStore}; use std::{ convert::TryFrom, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -1252,8 +1252,13 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk0 = TestChunk::new(0).with_table("h2o"); - let chunk1 = TestChunk::new(1).with_table("o2"); + let chunk0 = TestChunk::new(0) + .with_predicate_match(PredicateMatch::AtLeastOne) + .with_table("h2o"); + + let chunk1 = TestChunk::new(1) + .with_predicate_match(PredicateMatch::AtLeastOne) + .with_table("o2"); fixture .test_storage @@ -1409,7 +1414,9 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0).with_error("Sugar we are going down"); + let chunk = TestChunk::new(0) + .with_table("my_table") + .with_error("Sugar we are going down"); fixture .test_storage @@ -1535,6 +1542,7 @@ mod tests { let chunk = TestChunk::new(0) // predicate specifies m4, so this is filtered out + .with_table("my_table") .with_error("This is an error"); fixture @@ -1640,7 +1648,9 @@ mod tests { tag_key: [0].into(), }; - let chunk = TestChunk::new(0).with_table("h2o"); + let chunk = TestChunk::new(0) + .with_predicate_match(PredicateMatch::AtLeastOne) + .with_table("h2o"); fixture .test_storage @@ -1717,7 +1727,9 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0).with_error("Sugar we are going down"); + let chunk = TestChunk::new(0) + .with_table("my_table") + .with_error("Sugar we are going down"); fixture .test_storage @@ -1835,7 +1847,9 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0).with_error("Sugar we are going down"); + let chunk = TestChunk::new(0) + .with_table("my_table") + .with_error("Sugar we are going down"); fixture .test_storage @@ -1985,7 +1999,9 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0).with_error("Sugar we are going down"); + let chunk = TestChunk::new(0) + .with_table("my_table") + .with_error("Sugar we are going down"); fixture .test_storage @@ -2077,7 +2093,9 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0).with_error("Sugar we are going down"); + let chunk = TestChunk::new(0) + .with_table("my_table") + .with_error("Sugar we are going down"); fixture .test_storage @@ -2287,7 +2305,9 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0).with_error("Sugar we are going down"); + let chunk = TestChunk::new(0) + .with_table("my_table") + .with_error("Sugar we are going down"); fixture .test_storage