Merge pull request #886 from influxdata/ntran/optimize_column_selection
feat: Optimize Column Selectionpull/24376/head
commit
fb40183559
|
@ -4,3 +4,4 @@
|
|||
.env
|
||||
*.tsm
|
||||
**/.DS_Store
|
||||
**/.vscode
|
|
@ -337,6 +337,7 @@ impl Chunk {
|
|||
&self,
|
||||
table_name: &str,
|
||||
chunk_predicate: &ChunkPredicate,
|
||||
selection: Selection<'_>,
|
||||
) -> Result<Option<BTreeSet<String>>> {
|
||||
// No support for general purpose expressions
|
||||
if !chunk_predicate.chunk_exprs.is_empty() {
|
||||
|
@ -360,6 +361,16 @@ impl Chunk {
|
|||
}
|
||||
}
|
||||
|
||||
// Only return subset of these selection_cols if not all_cols
|
||||
let mut all_cols = true;
|
||||
let selection_cols = match selection {
|
||||
Selection::All => &[""],
|
||||
Selection::Some(cols) => {
|
||||
all_cols = false;
|
||||
cols
|
||||
}
|
||||
};
|
||||
|
||||
let mut column_names = BTreeSet::new();
|
||||
for &column_id in &chunk_column_ids {
|
||||
let column_name =
|
||||
|
@ -370,7 +381,10 @@ impl Chunk {
|
|||
chunk: self.id,
|
||||
})?;
|
||||
|
||||
if !column_names.contains(column_name) {
|
||||
if !column_names.contains(column_name)
|
||||
&& (all_cols || selection_cols.contains(&column_name))
|
||||
{
|
||||
// only use columns in selection_cols
|
||||
column_names.insert(column_name.to_string());
|
||||
}
|
||||
}
|
||||
|
@ -744,6 +758,7 @@ impl query::PartitionChunk for Chunk {
|
|||
&self,
|
||||
_table_name: &str,
|
||||
_predicate: &Predicate,
|
||||
_columns: Selection<'_>,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
unimplemented!("This function is slated for removal")
|
||||
}
|
||||
|
|
|
@ -228,25 +228,29 @@ impl InfluxRPCPlanner {
|
|||
chunk_id = chunk.id(),
|
||||
"finding columns in table"
|
||||
);
|
||||
// try and get the list of columns directly from metadata
|
||||
|
||||
// get only tag columns from metadata
|
||||
let schema = chunk
|
||||
.table_schema(&table_name, Selection::All)
|
||||
.await
|
||||
.expect("to be able to get table schema");
|
||||
let column_names: Vec<&str> = schema
|
||||
.tags_iter()
|
||||
.map(|f| f.name().as_str())
|
||||
.collect::<Vec<&str>>();
|
||||
|
||||
let selection = Selection::Some(&column_names);
|
||||
|
||||
// filter the columns further from the predicate
|
||||
let maybe_names = chunk
|
||||
.column_names(&table_name, &predicate)
|
||||
.column_names(&table_name, &predicate, selection)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(FindingColumnNames)?;
|
||||
|
||||
match maybe_names {
|
||||
Some(names) => {
|
||||
Some(mut names) => {
|
||||
debug!(names=?names, chunk_id = chunk.id(), "column names found from metadata");
|
||||
|
||||
// can restrict the output to only the tag columns
|
||||
let schema = chunk
|
||||
.table_schema(&table_name, Selection::All)
|
||||
.await
|
||||
.expect("to be able to get table schema");
|
||||
let mut names = self.restrict_to_tags(&schema, names);
|
||||
debug!(names=?names, chunk_id = chunk.id(), "column names found from metadata");
|
||||
|
||||
known_columns.append(&mut names);
|
||||
}
|
||||
None => {
|
||||
|
@ -533,18 +537,6 @@ impl InfluxRPCPlanner {
|
|||
Ok(table_names)
|
||||
}
|
||||
|
||||
/// removes any columns from Names that are not "Tag"s in the Influx Data
|
||||
/// Model
|
||||
fn restrict_to_tags(&self, schema: &Schema, names: BTreeSet<String>) -> BTreeSet<String> {
|
||||
names
|
||||
.into_iter()
|
||||
.filter(|col_name| {
|
||||
let idx = schema.find_index_of(col_name).unwrap();
|
||||
matches!(schema.field(idx), (Some(InfluxColumnType::Tag), _))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Creates a DataFusion LogicalPlan that returns column *names* as a
|
||||
/// single column of Strings for a specific table
|
||||
///
|
||||
|
|
|
@ -128,6 +128,7 @@ pub trait PartitionChunk: Debug + Send + Sync {
|
|||
&self,
|
||||
table_name: &str,
|
||||
predicate: &Predicate,
|
||||
columns: Selection<'_>,
|
||||
) -> Result<Option<StringSet>, Self::Error>;
|
||||
|
||||
/// Return a set of Strings containing the distinct values in the
|
||||
|
|
|
@ -435,6 +435,35 @@ impl TestChunk {
|
|||
.push(Arc::new(batch));
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns all columns of the table
|
||||
pub fn all_column_names(&self, table_name: &str) -> Option<StringSet> {
|
||||
let column_names = self.table_schemas.get(table_name).map(|schema| {
|
||||
schema
|
||||
.iter()
|
||||
.map(|(_, field)| field.name().to_string())
|
||||
.collect::<StringSet>()
|
||||
});
|
||||
|
||||
column_names
|
||||
}
|
||||
|
||||
/// Returns just the specified columns
|
||||
pub fn specific_column_names_selection(
|
||||
&self,
|
||||
table_name: &str,
|
||||
columns: &[&str],
|
||||
) -> Option<StringSet> {
|
||||
let column_names = self.table_schemas.get(table_name).map(|schema| {
|
||||
schema
|
||||
.iter()
|
||||
.map(|(_, field)| field.name().to_string())
|
||||
.filter(|col| columns.contains(&col.as_str()))
|
||||
.collect::<StringSet>()
|
||||
});
|
||||
|
||||
column_names
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -523,18 +552,18 @@ impl PartitionChunk for TestChunk {
|
|||
&self,
|
||||
table_name: &str,
|
||||
predicate: &Predicate,
|
||||
selection: Selection<'_>,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
self.check_error()?;
|
||||
|
||||
// save the predicate
|
||||
self.predicate.lock().replace(predicate.clone());
|
||||
|
||||
let column_names = self.table_schemas.get(table_name).map(|schema| {
|
||||
schema
|
||||
.iter()
|
||||
.map(|(_, field)| field.name().to_string())
|
||||
.collect::<StringSet>()
|
||||
});
|
||||
// only return columns specified in selection
|
||||
let column_names = match selection {
|
||||
Selection::All => self.all_column_names(table_name),
|
||||
Selection::Some(cols) => self.specific_column_names_selection(table_name, cols),
|
||||
};
|
||||
|
||||
Ok(column_names)
|
||||
}
|
||||
|
|
|
@ -316,6 +316,7 @@ impl PartitionChunk for DBChunk {
|
|||
&self,
|
||||
table_name: &str,
|
||||
predicate: &Predicate,
|
||||
columns: Selection<'_>,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
match self {
|
||||
Self::MutableBuffer { chunk } => {
|
||||
|
@ -324,7 +325,7 @@ impl PartitionChunk for DBChunk {
|
|||
.context(MutableBufferChunk)?;
|
||||
|
||||
chunk
|
||||
.column_names(table_name, &chunk_predicate)
|
||||
.column_names(table_name, &chunk_predicate, columns)
|
||||
.context(MutableBufferChunk)
|
||||
}
|
||||
Self::ReadBuffer {
|
||||
|
@ -339,13 +340,7 @@ impl PartitionChunk for DBChunk {
|
|||
let chunk_ids = &[chunk_id];
|
||||
|
||||
let names = db
|
||||
.column_names(
|
||||
partition_key,
|
||||
table_name,
|
||||
chunk_ids,
|
||||
rb_predicate,
|
||||
Selection::All,
|
||||
)
|
||||
.column_names(partition_key, table_name, chunk_ids, rb_predicate, columns)
|
||||
.context(ReadBufferChunk { chunk_id })?;
|
||||
|
||||
Ok(names)
|
||||
|
|
Loading…
Reference in New Issue