feat: Optimize Column Selection
parent
f5f96b0db0
commit
eb81975151
|
@ -4,3 +4,4 @@
|
|||
.env
|
||||
*.tsm
|
||||
**/.DS_Store
|
||||
**/.vscode
|
|
@ -305,6 +305,7 @@ impl Chunk {
|
|||
&self,
|
||||
table_name: &str,
|
||||
chunk_predicate: &ChunkPredicate,
|
||||
selection: Selection<'_>,
|
||||
) -> Result<Option<BTreeSet<String>>> {
|
||||
// No support for general purpose expressions
|
||||
if !chunk_predicate.chunk_exprs.is_empty() {
|
||||
|
@ -334,6 +335,16 @@ impl Chunk {
|
|||
}
|
||||
}
|
||||
|
||||
// Only return subset of these selection_cols if not all_cols
|
||||
let mut all_cols = true;
|
||||
let selection_cols = match selection {
|
||||
Selection::All => &[""],
|
||||
Selection::Some(cols) => {
|
||||
all_cols = false;
|
||||
cols
|
||||
}
|
||||
};
|
||||
|
||||
let mut column_names = BTreeSet::new();
|
||||
for &column_id in &chunk_column_ids {
|
||||
let column_name =
|
||||
|
@ -344,7 +355,8 @@ impl Chunk {
|
|||
chunk: self.id,
|
||||
})?;
|
||||
|
||||
if !column_names.contains(column_name) {
|
||||
if !column_names.contains(column_name) &&
|
||||
(all_cols || selection_cols.contains(&column_name)) { // only use columns in selection_cols
|
||||
column_names.insert(column_name.to_string());
|
||||
}
|
||||
}
|
||||
|
@ -609,6 +621,7 @@ impl query::PartitionChunk for Chunk {
|
|||
&self,
|
||||
_table_name: &str,
|
||||
_predicate: &Predicate,
|
||||
_columns: Selection<'_>,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
unimplemented!("This function is slated for removal")
|
||||
}
|
||||
|
|
|
@ -198,25 +198,34 @@ impl InfluxRPCPlanner {
|
|||
chunk_id = chunk.id(),
|
||||
"finding columns in table"
|
||||
);
|
||||
// try and get the list of columns directly from metadata
|
||||
|
||||
// get only tag columns from metadata
|
||||
let schema = chunk
|
||||
.table_schema(&table_name, Selection::All)
|
||||
.await
|
||||
.expect("to be able to get table schema");
|
||||
let column_names: Vec<&str> = schema
|
||||
.iter()
|
||||
.filter_map(|(influx_column_type, field)| {
|
||||
if matches!(influx_column_type, Some(InfluxColumnType::Tag)) {
|
||||
Some(field.name().as_str())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<&str>>();
|
||||
let selection = Selection::Some(&column_names);
|
||||
|
||||
// filter the columns further from the predicate
|
||||
let maybe_names = chunk
|
||||
.column_names(&table_name, &predicate)
|
||||
.column_names(&table_name, &predicate, selection)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(FindingColumnNames)?;
|
||||
|
||||
|
||||
match maybe_names {
|
||||
Some(names) => {
|
||||
Some(mut names) => {
|
||||
debug!(names=?names, chunk_id = chunk.id(), "column names found from metadata");
|
||||
|
||||
// can restrict the output to only the tag columns
|
||||
let schema = chunk
|
||||
.table_schema(&table_name, Selection::All)
|
||||
.await
|
||||
.expect("to be able to get table schema");
|
||||
let mut names = self.restrict_to_tags(&schema, names);
|
||||
debug!(names=?names, chunk_id = chunk.id(), "column names found from metadata");
|
||||
|
||||
known_columns.append(&mut names);
|
||||
}
|
||||
None => {
|
||||
|
@ -354,18 +363,6 @@ impl InfluxRPCPlanner {
|
|||
Ok(table_names)
|
||||
}
|
||||
|
||||
/// removes any columns from Names that are not "Tag"s in the Influx Data
|
||||
/// Model
|
||||
fn restrict_to_tags(&self, schema: &Schema, names: BTreeSet<String>) -> BTreeSet<String> {
|
||||
names
|
||||
.into_iter()
|
||||
.filter(|col_name| {
|
||||
let idx = schema.find_index_of(col_name).unwrap();
|
||||
matches!(schema.field(idx), (Some(InfluxColumnType::Tag), _))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Creates a DataFusion LogicalPlan that returns column *names* as a
|
||||
/// single column of Strings for a specific table
|
||||
///
|
||||
|
|
|
@ -137,6 +137,7 @@ pub trait PartitionChunk: Debug + Send + Sync {
|
|||
&self,
|
||||
table_name: &str,
|
||||
predicate: &Predicate,
|
||||
columns: Selection<'_>,
|
||||
) -> Result<Option<StringSet>, Self::Error>;
|
||||
|
||||
/// Returns the Schema for a table in this chunk, with the
|
||||
|
|
|
@ -496,6 +496,41 @@ impl TestChunk {
|
|||
.push(Arc::new(batch));
|
||||
self
|
||||
}
|
||||
|
||||
|
||||
/// Returns all columns of the table
|
||||
pub fn all_column_names(
|
||||
&self,
|
||||
table_name: &str,
|
||||
) -> Option<StringSet> {
|
||||
|
||||
let column_names = self.table_schemas.get(table_name).map(|schema| {
|
||||
schema
|
||||
.iter()
|
||||
.map(|(_, field)| field.name().to_string())
|
||||
.collect::<StringSet>()
|
||||
});
|
||||
|
||||
column_names
|
||||
}
|
||||
|
||||
/// Returns just the specified columns
|
||||
pub fn specific_column_names_selection(
|
||||
&self,
|
||||
table_name: &str,
|
||||
columns: &[&str],
|
||||
) -> Option<StringSet> {
|
||||
|
||||
let column_names = self.table_schemas.get(table_name).map(|schema| {
|
||||
schema
|
||||
.iter()
|
||||
.map(|(_, field)| field.name().to_string())
|
||||
.filter(|col| columns.contains(&col.as_str()))
|
||||
.collect::<StringSet>()
|
||||
});
|
||||
|
||||
column_names
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -574,18 +609,18 @@ impl PartitionChunk for TestChunk {
|
|||
&self,
|
||||
table_name: &str,
|
||||
predicate: &Predicate,
|
||||
selection: Selection<'_>,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
self.check_error()?;
|
||||
|
||||
// save the predicate
|
||||
self.predicate.lock().replace(predicate.clone());
|
||||
|
||||
let column_names = self.table_schemas.get(table_name).map(|schema| {
|
||||
schema
|
||||
.iter()
|
||||
.map(|(_, field)| field.name().to_string())
|
||||
.collect::<StringSet>()
|
||||
});
|
||||
|
||||
// only return columns specified in selection
|
||||
let column_names = match selection {
|
||||
Selection::All => self.all_column_names(table_name),
|
||||
Selection::Some(cols) => self.specific_column_names_selection(table_name, cols)
|
||||
};
|
||||
|
||||
Ok(column_names)
|
||||
}
|
||||
|
|
|
@ -316,6 +316,7 @@ impl PartitionChunk for DBChunk {
|
|||
&self,
|
||||
table_name: &str,
|
||||
predicate: &Predicate,
|
||||
columns: Selection<'_>,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
match self {
|
||||
Self::MutableBuffer { chunk } => {
|
||||
|
@ -324,7 +325,7 @@ impl PartitionChunk for DBChunk {
|
|||
.context(MutableBufferChunk)?;
|
||||
|
||||
chunk
|
||||
.column_names(table_name, &chunk_predicate)
|
||||
.column_names(table_name, &chunk_predicate, columns)
|
||||
.context(MutableBufferChunk)
|
||||
}
|
||||
Self::ReadBuffer {
|
||||
|
@ -344,7 +345,7 @@ impl PartitionChunk for DBChunk {
|
|||
table_name,
|
||||
chunk_ids,
|
||||
rb_predicate,
|
||||
Selection::All,
|
||||
columns,
|
||||
)
|
||||
.context(ReadBufferChunk { chunk_id })?;
|
||||
|
||||
|
|
Loading…
Reference in New Issue