From 0dd1826e3ccdd26ed1458e3ee60d1f3eed64673b Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Mon, 15 May 2023 09:30:28 +1000 Subject: [PATCH] feat: add the merged tag set for the `Select` query --- iox_query_influxql/src/plan/ir.rs | 15 +++++--- iox_query_influxql/src/plan/rewriter.rs | 46 +++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/iox_query_influxql/src/plan/ir.rs b/iox_query_influxql/src/plan/ir.rs index 8117c64c39..5244317725 100644 --- a/iox_query_influxql/src/plan/ir.rs +++ b/iox_query_influxql/src/plan/ir.rs @@ -1,7 +1,6 @@ //! Defines data structures which represent an InfluxQL //! statement after it has been processed -use crate::plan::field::field_by_name; use crate::plan::rewriter::ProjectionType; use crate::plan::{error, SchemaProvider}; use datafusion::common::Result; @@ -48,6 +47,10 @@ pub(super) struct Select { /// The GROUP BY clause of the selection. pub(super) group_by: Option, + /// The set of possible tags for the selection, by combining + /// the tag sets of all inputs via the `FROM` clause. + pub(super) tag_set: TagSet, + /// The [fill] clause specifies the fill behaviour for the selection. If the value is [`None`], /// it is the same behavior as `fill(null)`. /// @@ -138,12 +141,14 @@ pub(super) enum DataSourceSchema<'a> { } impl<'a> DataSourceSchema<'a> { - pub(super) fn field_type_by_name(&self, name: &str) -> Option { + /// Returns `true` if the specified name is a tag field or a projection of a tag field if + /// the `DataSource` is a subquery. + pub(super) fn is_tag_field(&self, name: &str) -> bool { match self { - DataSourceSchema::Table(s) => s.field_type_by_name(name), - DataSourceSchema::Subquery(q) => { - field_by_name(&q.fields, name).and_then(|f| f.data_type) + DataSourceSchema::Table(s) => { + matches!(s.field_type_by_name(name), Some(InfluxColumnType::Tag)) } + DataSourceSchema::Subquery(q) => q.tag_set.contains(name), } } } diff --git a/iox_query_influxql/src/plan/rewriter.rs b/iox_query_influxql/src/plan/rewriter.rs index e1e0208c9e..ce4d4de8be 100644 --- a/iox_query_influxql/src/plan/rewriter.rs +++ b/iox_query_influxql/src/plan/rewriter.rs @@ -69,6 +69,7 @@ fn map_select(s: &dyn SchemaProvider, stmt: &SelectStatement) -> Result from, condition: stmt.condition.clone(), group_by, + tag_set, fill: stmt.fill, order_by: stmt.order_by, limit: stmt.limit, @@ -236,6 +238,24 @@ fn from_drop_empty(s: &dyn SchemaProvider, stmt: &mut Select) { }); } +/// Determine the combined tag set for the specified `from`. +fn select_tag_set(s: &dyn SchemaProvider, from: &[DataSource]) -> TagSet { + let mut tag_set = TagSet::new(); + + for ds in from { + match ds { + DataSource::Table(table_name) => { + if let Some(table) = s.table_schema(table_name) { + tag_set.extend(table.tags_iter().map(|f| f.name().to_owned())) + } + } + DataSource::Subquery(q) => tag_set.extend(q.tag_set.clone()), + } + } + + tag_set +} + /// Determine the merged fields and tags of the `FROM` clause. fn from_field_and_dimensions( s: &dyn SchemaProvider, @@ -1712,6 +1732,7 @@ mod test { mod rewrite_statement { use super::*; + use crate::plan::ir::TagSet; use datafusion::common::Result; use influxdb_influxql_parser::select::SelectStatement; use schema::{InfluxColumnType, InfluxFieldType}; @@ -1765,6 +1786,31 @@ mod test { ); } + /// Validate the tag_set field of a [`Select]` + #[test] + fn tag_set_schema() { + let namespace = MockSchemaProvider::default(); + + macro_rules! assert_tag_set { + ($Q:ident, $($TAG:literal),*) => { + assert_eq!($Q.select.tag_set, TagSet::from([$($TAG.to_owned(),)*])) + }; + } + + let stmt = parse_select("SELECT usage_system FROM cpu"); + let q = rewrite_statement(&namespace, &stmt).unwrap(); + assert_tag_set!(q, "cpu", "host", "region"); + + let stmt = parse_select("SELECT usage_system FROM cpu, disk"); + let q = rewrite_statement(&namespace, &stmt).unwrap(); + assert_tag_set!(q, "cpu", "host", "region", "device"); + + let stmt = + parse_select("SELECT usage_system FROM (select * from cpu), (select * from disk)"); + let q = rewrite_statement(&namespace, &stmt).unwrap(); + assert_tag_set!(q, "cpu", "host", "region", "device"); + } + /// Validating types for simple projections #[test] fn projection_simple() {