refactor: Remove last vestiges of multi-table chunks from PartitionChunk API (#1588)

* refactor: Remove last vestiges of multi-table chunks from PartitionChunk API

* fix: remove test that can no longer fail

* fix: update tests + code review comments

* fix: clippy

* fix: clippy

* fix: restore test_measurement_fields_error test
pull/24376/head
Andrew Lamb 2021-06-01 12:12:33 -04:00 committed by GitHub
parent dbfb56b6c1
commit d8fbb7b410
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 315 additions and 283 deletions

View File

@ -120,13 +120,6 @@ impl Chunk {
Ok(()) Ok(())
} }
// Add the table name in this chunk to `names` if it is not already present
pub fn all_table_names(&self, names: &mut BTreeSet<String>) {
if !names.contains(self.table_name.as_ref()) {
names.insert(self.table_name.to_string());
}
}
/// Returns a queryable snapshot of this chunk /// Returns a queryable snapshot of this chunk
#[cfg(not(feature = "nocache"))] #[cfg(not(feature = "nocache"))]
pub fn snapshot(&self) -> Arc<ChunkSnapshot> { pub fn snapshot(&self) -> Arc<ChunkSnapshot> {

View File

@ -72,11 +72,6 @@ impl ChunkSnapshot {
self.batch.num_rows() == 0 self.batch.num_rows() == 0
} }
/// Return true if this snapshot has the specified table name
pub fn has_table(&self, table_name: &str) -> bool {
self.table_name.as_ref() == table_name
}
/// Return Schema for the specified table / columns /// Return Schema for the specified table / columns
pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result<Schema> { pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result<Schema> {
// Temporary #1295 // Temporary #1295
@ -102,7 +97,7 @@ impl ChunkSnapshot {
/// Returns a list of tables with writes matching the given timestamp_range /// Returns a list of tables with writes matching the given timestamp_range
pub fn table_names(&self, timestamp_range: Option<TimestampRange>) -> BTreeSet<String> { pub fn table_names(&self, timestamp_range: Option<TimestampRange>) -> BTreeSet<String> {
let mut ret = BTreeSet::new(); let mut ret = BTreeSet::new();
if self.matches_predicate(&timestamp_range) { if self.has_timerange(&timestamp_range) {
ret.insert(self.table_name.to_string()); ret.insert(self.table_name.to_string());
} }
ret ret
@ -222,7 +217,7 @@ impl ChunkSnapshot {
self.batch.num_rows() self.batch.num_rows()
} }
fn matches_predicate(&self, timestamp_range: &Option<TimestampRange>) -> bool { pub fn has_timerange(&self, timestamp_range: &Option<TimestampRange>) -> bool {
let timestamp_range = match timestamp_range { let timestamp_range = match timestamp_range {
Some(t) => t, Some(t) => t,
None => return true, None => return true,

View File

@ -134,14 +134,9 @@ impl Chunk {
self.table.full_schema() self.table.full_schema()
} }
// Return all tables of this chunk whose timestamp overlaps with the give one // Return true if the table in this chunk contains values within the time range
pub fn table_names( pub fn has_timerange(&self, timestamp_range: Option<&TimestampRange>) -> bool {
&self, self.table.matches_predicate(timestamp_range)
timestamp_range: Option<TimestampRange>,
) -> impl Iterator<Item = String> + '_ {
std::iter::once(&self.table)
.filter(move |table| table.matches_predicate(&timestamp_range))
.map(|table| table.name().to_string())
} }
// Return the columns names that belong to the given column // Return the columns names that belong to the given column

View File

@ -110,7 +110,7 @@ impl Table {
} }
// Check if 2 time ranges overlap // Check if 2 time ranges overlap
pub fn matches_predicate(&self, timestamp_range: &Option<TimestampRange>) -> bool { pub fn matches_predicate(&self, timestamp_range: Option<&TimestampRange>) -> bool {
match (self.timestamp_range, timestamp_range) { match (self.timestamp_range, timestamp_range) {
(Some(a), Some(b)) => !a.disjoint(b), (Some(a), Some(b)) => !a.disjoint(b),
(None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */ (None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */

View File

@ -19,7 +19,7 @@ use observability_deps::tracing::debug;
use snafu::{ensure, ResultExt, Snafu}; use snafu::{ensure, ResultExt, Snafu};
use crate::{ use crate::{
exec::{field::FieldColumns, make_schema_pivot, stringset::StringSet}, exec::{field::FieldColumns, make_schema_pivot},
func::{ func::{
selectors::{selector_first, selector_last, selector_max, selector_min, SelectorOutput}, selectors::{selector_first, selector_last, selector_max, selector_min, SelectorOutput},
window::make_window_bound_expr, window::make_window_bound_expr,
@ -30,7 +30,7 @@ use crate::{
seriesset::{SeriesSetPlan, SeriesSetPlans}, seriesset::{SeriesSetPlan, SeriesSetPlans},
stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder}, stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder},
}, },
predicate::{Predicate, PredicateBuilder}, predicate::{Predicate, PredicateMatch},
provider::ProviderBuilder, provider::ProviderBuilder,
util::schema_has_all_expr_columns, util::schema_has_all_expr_columns,
Database, PartitionChunk, Database, PartitionChunk,
@ -196,21 +196,31 @@ impl InfluxRpcPlanner {
let mut builder = StringSetPlanBuilder::new(); let mut builder = StringSetPlanBuilder::new();
for chunk in database.chunks(&predicate) { for chunk in database.chunks(&predicate) {
let new_table_names = chunk // Try and apply the predicate using only metadata
.table_names(&predicate, builder.known_strings()) let pred_result = chunk
.apply_predicate(&predicate)
.map_err(|e| Box::new(e) as _) .map_err(|e| Box::new(e) as _)
.context(TableNamePlan)?; .context(CheckingChunkPredicate {
chunk_id: chunk.id(),
})?;
builder = match new_table_names { builder = match pred_result {
Some(new_table_names) => builder.append(new_table_names.into()), PredicateMatch::AtLeastOne => builder.append_table(chunk.table_name()),
None => { // no match, ignore table
PredicateMatch::Zero => builder,
// can't evaluate predicate, need a new plan
PredicateMatch::Unknown => {
// TODO: General purpose plans for // TODO: General purpose plans for
// table_names. For now, if we couldn't figure out // table_names. For now, return an error
// the table names from only metadata, generate an debug!(
// error chunk = chunk.id(),
?predicate,
table_name = chunk.table_name(),
"can not evaluate predicate"
);
return UnsupportedPredicateForTableNames { predicate }.fail(); return UnsupportedPredicateForTableNames { predicate }.fail();
} }
} };
} }
let plan = builder.build().context(CreatingStringSet)?; let plan = builder.build().context(CreatingStringSet)?;
@ -241,51 +251,60 @@ impl InfluxRpcPlanner {
let mut known_columns = BTreeSet::new(); let mut known_columns = BTreeSet::new();
for chunk in database.chunks(&predicate) { for chunk in database.chunks(&predicate) {
// try and get the table names that have rows that match the predicate // Try and apply the predicate using only metadata
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?; let pred_result = chunk
.apply_predicate(&predicate)
.map_err(|e| Box::new(e) as _)
.context(CheckingChunkPredicate {
chunk_id: chunk.id(),
})?;
for table_name in table_names { if matches!(pred_result, PredicateMatch::Zero) {
debug!( continue;
table_name = table_name.as_str(), }
chunk_id = chunk.id(), let table_name = chunk.table_name();
"finding columns in table" let chunk_id = chunk.id();
); debug!(table_name, chunk_id, "finding columns in table");
// get only tag columns from metadata // get only tag columns from metadata
let schema = chunk let schema = chunk
.table_schema(Selection::All) .table_schema(Selection::All)
.expect("to be able to get table schema"); .map_err(|e| Box::new(e) as _)
let column_names: Vec<&str> = schema .context(GettingTableSchema {
.tags_iter() table_name,
.map(|f| f.name().as_str()) chunk_id,
.collect::<Vec<&str>>(); })?;
let selection = Selection::Some(&column_names); let column_names: Vec<&str> = schema
.tags_iter()
.map(|f| f.name().as_str())
.collect::<Vec<&str>>();
// filter the columns further from the predicate let selection = Selection::Some(&column_names);
let maybe_names = chunk
.column_names(&predicate, selection)
.map_err(|e| Box::new(e) as _)
.context(FindingColumnNames)?;
match maybe_names { // filter the columns further from the predicate
Some(mut names) => { let maybe_names = chunk
debug!(names=?names, chunk_id = chunk.id(), "column names found from metadata"); .column_names(&predicate, selection)
known_columns.append(&mut names); .map_err(|e| Box::new(e) as _)
} .context(FindingColumnNames)?;
None => {
debug!( match maybe_names {
table_name = table_name.as_str(), Some(mut names) => {
chunk_id = chunk.id(), debug!(names=?names, chunk_id = chunk.id(), "column names found from metadata");
"column names need full plan" known_columns.append(&mut names);
); }
// can't get columns only from metadata, need None => {
// a general purpose plan debug!(
need_full_plans table_name,
.entry(table_name) chunk_id = chunk.id(),
.or_insert_with(Vec::new) "column names need full plan"
.push(Arc::clone(&chunk)); );
} // can't get columns only from metadata, need
// a general purpose plan
need_full_plans
.entry(table_name.to_string())
.or_insert_with(Vec::new)
.push(Arc::clone(&chunk));
} }
} }
} }
@ -346,70 +365,79 @@ impl InfluxRpcPlanner {
let mut known_values = BTreeSet::new(); let mut known_values = BTreeSet::new();
for chunk in database.chunks(&predicate) { for chunk in database.chunks(&predicate) {
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?; // Try and apply the predicate using only metadata
let pred_result = chunk
.apply_predicate(&predicate)
.map_err(|e| Box::new(e) as _)
.context(CheckingChunkPredicate {
chunk_id: chunk.id(),
})?;
for table_name in table_names { if matches!(pred_result, PredicateMatch::Zero) {
debug!( continue;
table_name = table_name.as_str(), }
chunk_id = chunk.id(), let table_name = chunk.table_name();
"finding columns in table" let chunk_id = chunk.id();
); debug!(table_name, chunk_id, "finding columns in table");
// use schema to validate column type // use schema to validate column type
let schema = chunk let schema = chunk
.table_schema(Selection::All) .table_schema(Selection::All)
.expect("to be able to get table schema"); .map_err(|e| Box::new(e) as _)
.context(GettingTableSchema {
table_name,
chunk_id,
})?;
// Skip this table if the tag_name is not a column in this table // Skip this table if the tag_name is not a column in this table
let idx = if let Some(idx) = schema.find_index_of(tag_name) { let idx = if let Some(idx) = schema.find_index_of(tag_name) {
idx idx
} else { } else {
continue; continue;
}; };
// Validate that this really is a Tag column // Validate that this really is a Tag column
let (influx_column_type, field) = schema.field(idx); let (influx_column_type, field) = schema.field(idx);
ensure!( ensure!(
matches!(influx_column_type, Some(InfluxColumnType::Tag)), matches!(influx_column_type, Some(InfluxColumnType::Tag)),
InvalidTagColumn { InvalidTagColumn {
tag_name, tag_name,
influx_column_type, influx_column_type,
} }
); );
ensure!( ensure!(
influx_column_type influx_column_type
.unwrap() .unwrap()
.valid_arrow_type(field.data_type()), .valid_arrow_type(field.data_type()),
InternalInvalidTagType { InternalInvalidTagType {
tag_name, tag_name,
data_type: field.data_type().clone(), data_type: field.data_type().clone(),
} }
); );
// try and get the list of values directly from metadata // try and get the list of values directly from metadata
let maybe_values = chunk let maybe_values = chunk
.column_values(tag_name, &predicate) .column_values(tag_name, &predicate)
.map_err(|e| Box::new(e) as _) .map_err(|e| Box::new(e) as _)
.context(FindingColumnValues)?; .context(FindingColumnValues)?;
match maybe_values { match maybe_values {
Some(mut names) => { Some(mut names) => {
debug!(names=?names, chunk_id = chunk.id(), "column values found from metadata"); debug!(names=?names, chunk_id = chunk.id(), "column values found from metadata");
known_values.append(&mut names); known_values.append(&mut names);
} }
None => { None => {
debug!( debug!(
table_name = table_name.as_str(), table_name,
chunk_id = chunk.id(), chunk_id = chunk.id(),
"need full plan to find column values" "need full plan to find column values"
); );
// can't get columns only from metadata, need // can't get columns only from metadata, need
// a general purpose plan // a general purpose plan
need_full_plans need_full_plans
.entry(table_name) .entry(table_name.to_string())
.or_insert_with(Vec::new) .or_insert_with(Vec::new)
.push(Arc::clone(&chunk)); .push(Arc::clone(&chunk));
}
} }
} }
} }
@ -609,7 +637,7 @@ impl InfluxRpcPlanner {
Ok(ss_plans.into()) Ok(ss_plans.into())
} }
/// Creates a map of table_name --> Chunks that have that table /// Creates a map of table_name --> Chunks that have that table that *may* pass the predicate
fn group_chunks_by_table<C>( fn group_chunks_by_table<C>(
&self, &self,
predicate: &Predicate, predicate: &Predicate,
@ -620,56 +648,32 @@ impl InfluxRpcPlanner {
{ {
let mut table_chunks = BTreeMap::new(); let mut table_chunks = BTreeMap::new();
for chunk in chunks { for chunk in chunks {
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?; // Try and apply the predicate using only metadata
for table_name in table_names { let pred_result = chunk
table_chunks .apply_predicate(&predicate)
.entry(table_name) .map_err(|e| Box::new(e) as _)
.or_insert_with(Vec::new) .context(CheckingChunkPredicate {
.push(Arc::clone(&chunk)); chunk_id: chunk.id(),
})?;
match pred_result {
PredicateMatch::AtLeastOne |
// have to include chunk as we can't rule it out
PredicateMatch::Unknown => {
let table_name = chunk.table_name().to_string();
table_chunks
.entry(table_name)
.or_insert_with(Vec::new)
.push(Arc::clone(&chunk));
}
// Skip chunk here based on metadata
PredicateMatch::Zero => {
}
} }
} }
Ok(table_chunks) Ok(table_chunks)
} }
/// Find all the table names in the specified chunk that pass the predicate
fn chunk_table_names<C>(&self, chunk: &C, predicate: &Predicate) -> Result<BTreeSet<String>>
where
C: PartitionChunk + 'static,
{
let no_tables = StringSet::new();
// try and get the table names that have rows that match the predicate
let table_names = chunk
.table_names(&predicate, &no_tables)
.map_err(|e| Box::new(e) as _)
.context(TableNamePlan)?;
debug!(table_names=?table_names, chunk_id = chunk.id(), "chunk tables");
let table_names = match table_names {
Some(table_names) => {
debug!("found table names with original predicate");
table_names
}
None => {
// couldn't find table names with predicate, get all chunk tables,
// fall back to filtering ourself
let table_name_predicate = if let Some(table_names) = &predicate.table_names {
PredicateBuilder::new().tables(table_names).build()
} else {
Predicate::default()
};
chunk
.table_names(&table_name_predicate, &no_tables)
.map_err(|e| Box::new(e) as _)
.context(InternalTableNamePlanForDefault)?
// unwrap the Option (table didn't match)
.unwrap_or(no_tables)
}
};
Ok(table_names)
}
/// Creates a DataFusion LogicalPlan that returns column *names* as a /// Creates a DataFusion LogicalPlan that returns column *names* as a
/// single column of Strings for a specific table /// single column of Strings for a specific table
/// ///
@ -1109,11 +1113,11 @@ impl InfluxRpcPlanner {
let chunk_id = chunk.id(); let chunk_id = chunk.id();
// check that it is consistent with this table_name // check that it is consistent with this table_name
assert!( assert_eq!(
chunk.has_table(table_name), chunk.table_name(),
"Chunk {} did not have table {}, while trying to make a plan for it", table_name,
"Chunk {} expected table mismatch",
chunk.id(), chunk.id(),
table_name
); );
let chunk_table_schema = chunk let chunk_table_schema = chunk

View File

@ -11,6 +11,7 @@ use data_types::chunk_metadata::ChunkSummary;
use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::SendableRecordBatchStream;
use exec::{stringset::StringSet, Executor}; use exec::{stringset::StringSet, Executor};
use internal_types::{schema::Schema, selection::Selection}; use internal_types::{schema::Schema, selection::Selection};
use predicate::PredicateMatch;
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
@ -59,32 +60,16 @@ pub trait PartitionChunk: Debug + Send + Sync {
/// particular partition. /// particular partition.
fn id(&self) -> u32; fn id(&self) -> u32;
/// Returns true if this chunk contains data for the specified table /// Returns the name of the table stored in this chunk
fn has_table(&self, table_name: &str) -> bool; fn table_name(&self) -> &str;
/// Returns all table names from this chunk that have at least one /// Returns the result of applying the `predicate` to the chunk
/// row that matches the `predicate` and are not already in `known_tables`. /// using an efficient, but inexact method, based on metadata.
/// ///
/// If the predicate cannot be evaluated (e.g it has predicates /// NOTE: This method is suitable for calling during planning, and
/// that cannot be directly evaluated in the chunk), `None` is /// may return PredicateMatch::Unknown for certain types of
/// returned. /// predicates.
/// fn apply_predicate(&self, predicate: &Predicate) -> Result<PredicateMatch, Self::Error>;
/// `known_tables` is a list of table names already known to be in
/// other chunks from the same partition. It may be empty or
/// contain `table_names` not in this chunk.
fn table_names(
&self,
predicate: &Predicate,
known_tables: &StringSet,
) -> Result<Option<StringSet>, Self::Error>;
/// Adds all table names from this chunk without any predicate to
/// `known_tables)
///
/// `known_tables` is a list of table names already known to be in
/// other chunks from the same partition. It may be empty or
/// contain `table_names` not in this chunk.
fn all_table_names(&self, known_tables: &mut StringSet);
/// Returns a set of Strings with column names from the specified /// Returns a set of Strings with column names from the specified
/// table that have at least one row that matches `predicate`, if /// table that have at least one row that matches `predicate`, if

View File

@ -101,6 +101,14 @@ impl StringSetPlanBuilder {
&self.strings &self.strings
} }
/// Append the name of a table to the known strings
pub fn append_table(mut self, table_name: &str) -> Self {
if !self.strings.contains(table_name) {
self.strings.insert(table_name.to_string());
}
self
}
/// Append the strings from the passed plan into ourselves if possible, or /// Append the strings from the passed plan into ourselves if possible, or
/// passes on the plan /// passes on the plan
pub fn append(mut self, other: StringSetPlan) -> Self { pub fn append(mut self, other: StringSetPlan) -> Self {

View File

@ -28,6 +28,20 @@ pub const EMPTY_PREDICATE: Predicate = Predicate {
partition_key: None, partition_key: None,
}; };
#[derive(Debug, Clone, Copy)]
/// The result of evaluating a predicate on a set of rows
pub enum PredicateMatch {
/// There is at least one row that matches the predicate
AtLeastOne,
/// There are exactly zero rows that match the predicate
Zero,
/// There *may* be rows that match, OR there *may* be no rows that
/// match
Unknown,
}
/// Represents a parsed predicate for evaluation by the /// Represents a parsed predicate for evaluation by the
/// TSDatabase InfluxDB IOx query engine. /// TSDatabase InfluxDB IOx query engine.
/// ///

View File

@ -14,7 +14,7 @@ use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBa
use crate::exec::Executor; use crate::exec::Executor;
use crate::{ use crate::{
exec::stringset::{StringSet, StringSetRef}, exec::stringset::{StringSet, StringSetRef},
Database, DatabaseStore, PartitionChunk, Predicate, Database, DatabaseStore, PartitionChunk, Predicate, PredicateMatch,
}; };
use internal_types::{ use internal_types::{
@ -133,6 +133,9 @@ pub struct TestChunk {
/// A saved error that is returned instead of actual results /// A saved error that is returned instead of actual results
saved_error: Option<String>, saved_error: Option<String>,
/// Return value for apply_predicate, if desired
predicate_match: Option<PredicateMatch>,
} }
impl TestChunk { impl TestChunk {
@ -150,6 +153,12 @@ impl TestChunk {
self self
} }
/// specify that any call to apply_predicate should return this value
pub fn with_predicate_match(mut self, predicate_match: PredicateMatch) -> Self {
self.predicate_match = Some(predicate_match);
self
}
/// Checks the saved error, and returns it if any, otherwise returns OK /// Checks the saved error, and returns it if any, otherwise returns OK
fn check_error(&self) -> Result<()> { fn check_error(&self) -> Result<()> {
if let Some(message) = self.saved_error.as_ref() { if let Some(message) = self.saved_error.as_ref() {
@ -307,6 +316,10 @@ impl PartitionChunk for TestChunk {
self.id self.id
} }
fn table_name(&self) -> &str {
self.table_name.as_deref().unwrap()
}
fn read_filter( fn read_filter(
&self, &self,
predicate: &Predicate, predicate: &Predicate,
@ -322,29 +335,31 @@ impl PartitionChunk for TestChunk {
Ok(Box::pin(stream)) Ok(Box::pin(stream))
} }
fn table_names( fn apply_predicate(&self, predicate: &Predicate) -> Result<PredicateMatch> {
&self,
predicate: &Predicate,
_known_tables: &StringSet,
) -> Result<Option<StringSet>, Self::Error> {
self.check_error()?; self.check_error()?;
// save the predicate // save the predicate
self.predicates.lock().push(predicate.clone()); self.predicates.lock().push(predicate.clone());
// do basic filtering based on table name predicate. // check if there is a saved result to return
if let Some(&predicate_match) = self.predicate_match.as_ref() {
return Ok(predicate_match);
}
Ok(self // otherwise fall back to basic filtering based on table name predicate.
let predicate_match = self
.table_name .table_name
.as_ref() .as_ref()
.filter(|table_name| predicate.should_include_table(&table_name)) .map(|table_name| {
.map(|table_name| std::iter::once(table_name.to_string()).collect::<StringSet>())) if !predicate.should_include_table(&table_name) {
} PredicateMatch::Zero
} else {
PredicateMatch::Unknown
}
})
.unwrap_or(PredicateMatch::Unknown);
fn all_table_names(&self, known_tables: &mut StringSet) { Ok(predicate_match)
if let Some(table_name) = self.table_name.as_ref() {
known_tables.insert(table_name.to_string());
}
} }
fn table_schema(&self, selection: Selection<'_>) -> Result<Schema, Self::Error> { fn table_schema(&self, selection: Selection<'_>) -> Result<Schema, Self::Error> {
@ -366,13 +381,6 @@ impl PartitionChunk for TestChunk {
Ok(None) Ok(None)
} }
fn has_table(&self, table_name: &str) -> bool {
self.table_name
.as_ref()
.map(|n| n == table_name)
.unwrap_or(false)
}
fn column_names( fn column_names(
&self, &self,
predicate: &Predicate, predicate: &Predicate,

View File

@ -280,14 +280,7 @@ impl Chunk {
chunk: Arc<parquet_file::chunk::Chunk>, chunk: Arc<parquet_file::chunk::Chunk>,
metrics: ChunkMetrics, metrics: ChunkMetrics,
) -> Self { ) -> Self {
// workaround until https://github.com/influxdata/influxdb_iox/issues/1295 is fixed let table_name = Arc::from(chunk.table_name());
let table_name = Arc::from(
chunk
.table_names(None)
.next()
.expect("chunk must have exactly 1 table")
.as_ref(),
);
// Cache table summary + schema // Cache table summary + schema
let meta = Arc::new(ChunkMetadata { let meta = Arc::new(ChunkMetadata {

View File

@ -133,11 +133,7 @@ impl Partition {
chunk_id: u32, chunk_id: u32,
chunk: Arc<parquet_file::chunk::Chunk>, chunk: Arc<parquet_file::chunk::Chunk>,
) -> Result<Arc<RwLock<Chunk>>> { ) -> Result<Arc<RwLock<Chunk>>> {
// workaround until https://github.com/influxdata/influxdb_iox/issues/1295 is fixed let table_name = chunk.table_name().to_string();
let table_name = chunk
.table_names(None)
.next()
.expect("chunk must have exactly 1 table");
let chunk = Arc::new(self.metrics.new_lock(Chunk::new_object_store_only( let chunk = Arc::new(self.metrics.new_lock(Chunk::new_object_store_only(
chunk_id, chunk_id,

View File

@ -12,7 +12,11 @@ use mutable_buffer::chunk::snapshot::ChunkSnapshot;
use object_store::path::Path; use object_store::path::Path;
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
use parquet_file::chunk::Chunk as ParquetChunk; use parquet_file::chunk::Chunk as ParquetChunk;
use query::{exec::stringset::StringSet, predicate::Predicate, PartitionChunk}; use query::{
exec::stringset::StringSet,
predicate::{Predicate, PredicateMatch},
PartitionChunk,
};
use read_buffer::Chunk as ReadBufferChunk; use read_buffer::Chunk as ReadBufferChunk;
use super::{ use super::{
@ -196,26 +200,35 @@ impl PartitionChunk for DbChunk {
self.id self.id
} }
fn all_table_names(&self, known_tables: &mut StringSet) { fn table_name(&self) -> &str {
// TODO remove this function (use name from TableSummary directly!) self.table_name.as_ref()
let table_name = &self.meta.table_summary.name;
if !known_tables.contains(table_name) {
known_tables.insert(table_name.to_string());
}
} }
fn table_names( fn apply_predicate(&self, predicate: &Predicate) -> Result<PredicateMatch> {
&self, if !predicate.should_include_table(self.table_name().as_ref()) {
predicate: &Predicate, return Ok(PredicateMatch::Zero);
_known_tables: &StringSet, // TODO: Should this be being used? }
) -> Result<Option<StringSet>, Self::Error> {
let names = match &self.state { // TODO apply predicate pruning here...
let pred_result = match &self.state {
State::MutableBuffer { chunk, .. } => { State::MutableBuffer { chunk, .. } => {
if predicate.has_exprs() { if predicate.has_exprs() {
// TODO: Support more predicates // TODO: Support more predicates
return Ok(None); PredicateMatch::Unknown
} else if chunk.has_timerange(&predicate.range) {
// Note: this isn't precise / correct: if the
// chunk has the timerange, some other part of the
// predicate may rule out the rows, and thus
// without further work this clause should return
// "Unknown" rather than falsely claiming that
// there is at least one row:
//
// https://github.com/influxdata/influxdb_iox/issues/1590
PredicateMatch::AtLeastOne
} else {
PredicateMatch::Zero
} }
chunk.table_names(predicate.range)
} }
State::ReadBuffer { chunk, .. } => { State::ReadBuffer { chunk, .. } => {
// If not supported, ReadBuffer can't answer with // If not supported, ReadBuffer can't answer with
@ -224,23 +237,35 @@ impl PartitionChunk for DbChunk {
Ok(rb_predicate) => rb_predicate, Ok(rb_predicate) => rb_predicate,
Err(e) => { Err(e) => {
debug!(?predicate, %e, "read buffer predicate not supported for table_names, falling back"); debug!(?predicate, %e, "read buffer predicate not supported for table_names, falling back");
return Ok(None); return Ok(PredicateMatch::Unknown);
} }
}; };
chunk.table_names(&rb_predicate, &BTreeSet::new()) // TODO align API in read_buffer
let table_names = chunk.table_names(&rb_predicate, &BTreeSet::new());
if !table_names.is_empty() {
// As above, this should really be "Unknown" rather than AtLeastOne
// for precision / correctness.
PredicateMatch::AtLeastOne
} else {
PredicateMatch::Zero
}
}
State::ParquetFile { chunk, .. } => {
if predicate.has_exprs() {
// TODO: Support more predicates
PredicateMatch::Unknown
} else if chunk.has_timerange(predicate.range.as_ref()) {
// As above, this should really be "Unknown" rather than AtLeastOne
// for precision / correctness.
PredicateMatch::AtLeastOne
} else {
PredicateMatch::Zero
}
} }
State::ParquetFile { chunk, .. } => chunk.table_names(predicate.range).collect(),
}; };
// Prune out tables that should not be Ok(pred_result)
// present (based on additional table restrictions of the Predicate)
Ok(Some(
names
.into_iter()
.filter(|table_name| predicate.should_include_table(table_name))
.collect(),
))
} }
fn table_schema(&self, selection: Selection<'_>) -> Result<Schema, Self::Error> { fn table_schema(&self, selection: Selection<'_>) -> Result<Schema, Self::Error> {
@ -253,10 +278,6 @@ impl PartitionChunk for DbChunk {
}) })
} }
fn has_table(&self, table_name: &str) -> bool {
table_name == self.meta.table_summary.name
}
fn read_filter( fn read_filter(
&self, &self,
predicate: &Predicate, predicate: &Predicate,

View File

@ -36,7 +36,7 @@ macro_rules! run_table_schema_test_case {
let predicate = PredicateBuilder::new().table(table_name).build(); let predicate = PredicateBuilder::new().table(table_name).build();
for chunk in db.chunks(&predicate) { for chunk in db.chunks(&predicate) {
if chunk.has_table(table_name) { if chunk.table_name().as_ref() == table_name {
chunks_with_table += 1; chunks_with_table += 1;
let actual_schema = chunk.table_schema(selection.clone()).unwrap(); let actual_schema = chunk.table_schema(selection.clone()).unwrap();

View File

@ -1171,7 +1171,7 @@ mod tests {
use super::*; use super::*;
use datafusion::logical_plan::{col, lit, Expr}; use datafusion::logical_plan::{col, lit, Expr};
use panic_logging::SendPanicsToTracing; use panic_logging::SendPanicsToTracing;
use query::{test::TestChunk, test::TestDatabaseStore}; use query::{predicate::PredicateMatch, test::TestChunk, test::TestDatabaseStore};
use std::{ use std::{
convert::TryFrom, convert::TryFrom,
net::{IpAddr, Ipv4Addr, SocketAddr}, net::{IpAddr, Ipv4Addr, SocketAddr},
@ -1252,8 +1252,13 @@ mod tests {
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
let chunk0 = TestChunk::new(0).with_table("h2o"); let chunk0 = TestChunk::new(0)
let chunk1 = TestChunk::new(1).with_table("o2"); .with_predicate_match(PredicateMatch::AtLeastOne)
.with_table("h2o");
let chunk1 = TestChunk::new(1)
.with_predicate_match(PredicateMatch::AtLeastOne)
.with_table("o2");
fixture fixture
.test_storage .test_storage
@ -1409,7 +1414,9 @@ mod tests {
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
let chunk = TestChunk::new(0).with_error("Sugar we are going down"); let chunk = TestChunk::new(0)
.with_table("my_table")
.with_error("Sugar we are going down");
fixture fixture
.test_storage .test_storage
@ -1535,6 +1542,7 @@ mod tests {
let chunk = TestChunk::new(0) let chunk = TestChunk::new(0)
// predicate specifies m4, so this is filtered out // predicate specifies m4, so this is filtered out
.with_table("my_table")
.with_error("This is an error"); .with_error("This is an error");
fixture fixture
@ -1640,7 +1648,9 @@ mod tests {
tag_key: [0].into(), tag_key: [0].into(),
}; };
let chunk = TestChunk::new(0).with_table("h2o"); let chunk = TestChunk::new(0)
.with_predicate_match(PredicateMatch::AtLeastOne)
.with_table("h2o");
fixture fixture
.test_storage .test_storage
@ -1717,7 +1727,9 @@ mod tests {
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
let chunk = TestChunk::new(0).with_error("Sugar we are going down"); let chunk = TestChunk::new(0)
.with_table("my_table")
.with_error("Sugar we are going down");
fixture fixture
.test_storage .test_storage
@ -1835,7 +1847,9 @@ mod tests {
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
let chunk = TestChunk::new(0).with_error("Sugar we are going down"); let chunk = TestChunk::new(0)
.with_table("my_table")
.with_error("Sugar we are going down");
fixture fixture
.test_storage .test_storage
@ -1985,7 +1999,9 @@ mod tests {
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
let chunk = TestChunk::new(0).with_error("Sugar we are going down"); let chunk = TestChunk::new(0)
.with_table("my_table")
.with_error("Sugar we are going down");
fixture fixture
.test_storage .test_storage
@ -2077,7 +2093,9 @@ mod tests {
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
let chunk = TestChunk::new(0).with_error("Sugar we are going down"); let chunk = TestChunk::new(0)
.with_table("my_table")
.with_error("Sugar we are going down");
fixture fixture
.test_storage .test_storage
@ -2287,7 +2305,9 @@ mod tests {
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
let chunk = TestChunk::new(0).with_error("Sugar we are going down"); let chunk = TestChunk::new(0)
.with_table("my_table")
.with_error("Sugar we are going down");
fixture fixture
.test_storage .test_storage