feat: add the merged tag set for the `Select` query

pull/24376/head
Stuart Carnie 2023-05-15 09:30:28 +10:00
parent f7a8850f62
commit 0dd1826e3c
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
2 changed files with 56 additions and 5 deletions

View File

@ -1,7 +1,6 @@
//! Defines data structures which represent an InfluxQL
//! statement after it has been processed
use crate::plan::field::field_by_name;
use crate::plan::rewriter::ProjectionType;
use crate::plan::{error, SchemaProvider};
use datafusion::common::Result;
@ -48,6 +47,10 @@ pub(super) struct Select {
/// The GROUP BY clause of the selection.
pub(super) group_by: Option<GroupByClause>,
/// The set of possible tags for the selection, by combining
/// the tag sets of all inputs via the `FROM` clause.
pub(super) tag_set: TagSet,
/// The [fill] clause specifies the fill behaviour for the selection. If the value is [`None`],
/// it is the same behavior as `fill(null)`.
///
@ -138,12 +141,14 @@ pub(super) enum DataSourceSchema<'a> {
}
impl<'a> DataSourceSchema<'a> {
pub(super) fn field_type_by_name(&self, name: &str) -> Option<InfluxColumnType> {
/// Returns `true` if the specified name is a tag field or a projection of a tag field if
/// the `DataSource` is a subquery.
pub(super) fn is_tag_field(&self, name: &str) -> bool {
match self {
DataSourceSchema::Table(s) => s.field_type_by_name(name),
DataSourceSchema::Subquery(q) => {
field_by_name(&q.fields, name).and_then(|f| f.data_type)
DataSourceSchema::Table(s) => {
matches!(s.field_type_by_name(name), Some(InfluxColumnType::Tag))
}
DataSourceSchema::Subquery(q) => q.tag_set.contains(name),
}
}
}

View File

@ -69,6 +69,7 @@ fn map_select(s: &dyn SchemaProvider, stmt: &SelectStatement) -> Result<Select>
let from = expand_from(s, stmt)?;
let (fields, group_by) = expand_projection(s, stmt, &from)?;
let tag_set = select_tag_set(s, &from);
let SelectStatementInfo { projection_type } =
select_statement_info(&fields, &group_by, stmt.fill)?;
@ -79,6 +80,7 @@ fn map_select(s: &dyn SchemaProvider, stmt: &SelectStatement) -> Result<Select>
from,
condition: stmt.condition.clone(),
group_by,
tag_set,
fill: stmt.fill,
order_by: stmt.order_by,
limit: stmt.limit,
@ -236,6 +238,24 @@ fn from_drop_empty(s: &dyn SchemaProvider, stmt: &mut Select) {
});
}
/// Determine the combined tag set for the specified `from`.
fn select_tag_set(s: &dyn SchemaProvider, from: &[DataSource]) -> TagSet {
let mut tag_set = TagSet::new();
for ds in from {
match ds {
DataSource::Table(table_name) => {
if let Some(table) = s.table_schema(table_name) {
tag_set.extend(table.tags_iter().map(|f| f.name().to_owned()))
}
}
DataSource::Subquery(q) => tag_set.extend(q.tag_set.clone()),
}
}
tag_set
}
/// Determine the merged fields and tags of the `FROM` clause.
fn from_field_and_dimensions(
s: &dyn SchemaProvider,
@ -1712,6 +1732,7 @@ mod test {
mod rewrite_statement {
use super::*;
use crate::plan::ir::TagSet;
use datafusion::common::Result;
use influxdb_influxql_parser::select::SelectStatement;
use schema::{InfluxColumnType, InfluxFieldType};
@ -1765,6 +1786,31 @@ mod test {
);
}
/// Validate the tag_set field of a [`Select]`
#[test]
fn tag_set_schema() {
let namespace = MockSchemaProvider::default();
macro_rules! assert_tag_set {
($Q:ident, $($TAG:literal),*) => {
assert_eq!($Q.select.tag_set, TagSet::from([$($TAG.to_owned(),)*]))
};
}
let stmt = parse_select("SELECT usage_system FROM cpu");
let q = rewrite_statement(&namespace, &stmt).unwrap();
assert_tag_set!(q, "cpu", "host", "region");
let stmt = parse_select("SELECT usage_system FROM cpu, disk");
let q = rewrite_statement(&namespace, &stmt).unwrap();
assert_tag_set!(q, "cpu", "host", "region", "device");
let stmt =
parse_select("SELECT usage_system FROM (select * from cpu), (select * from disk)");
let q = rewrite_statement(&namespace, &stmt).unwrap();
assert_tag_set!(q, "cpu", "host", "region", "device");
}
/// Validating types for simple projections
#[test]
fn projection_simple() {