From aacd91db941c276b7a172cfdea22c870a992e580 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Wed, 4 Jan 2023 11:55:18 +1100 Subject: [PATCH] feat: Teach InfluxQL rewriter how to name columns (#6481) * feat: Add timestamp data type * feat: Add with_quiet API to suppress output to STDOUT * fix: Field name resolution to match InfluxQL * refactor: Allow TestChunks to be directly accessed This will be useful when testing the InfluxQL planner. * fix: Add Timestamp case to var_ref module * feat: Add InfluxQL compatible column naming * chore: Add doc comment. * fix: keywords may be followed by a `!` such as `!=` * fix: field_name improvements * No longer clones expressions * Explicitly handle all Expr enumerated items * more tests * fix: collision with explicitly aliased column Fixes case where column is explicitly aliased to an auto-named variant. Test case added to validate. --- .../src/expression/arithmetic.rs | 3 + influxdb_influxql_parser/src/keywords.rs | 8 ++ iox_query/src/plan/influxql/field.rs | 37 ++++-- iox_query/src/plan/influxql/rewriter.rs | 119 ++++++++++++++--- iox_query/src/plan/influxql/test_utils.rs | 125 ++++++++---------- iox_query/src/plan/influxql/var_ref.rs | 3 +- iox_query/src/test.rs | 18 ++- 7 files changed, 214 insertions(+), 99 deletions(-) diff --git a/influxdb_influxql_parser/src/expression/arithmetic.rs b/influxdb_influxql_parser/src/expression/arithmetic.rs index 9f286dc7b3..223dc3e844 100644 --- a/influxdb_influxql_parser/src/expression/arithmetic.rs +++ b/influxdb_influxql_parser/src/expression/arithmetic.rs @@ -191,6 +191,8 @@ pub enum VarRefDataType { Field, /// Represents a tag. Tag, + /// Represents a timestamp. + Timestamp, } impl VarRefDataType { @@ -215,6 +217,7 @@ impl Display for VarRefDataType { Self::Boolean => f.write_str("boolean"), Self::Tag => f.write_str("tag"), Self::Field => f.write_str("field"), + Self::Timestamp => f.write_str("timestamp"), } } } diff --git a/influxdb_influxql_parser/src/keywords.rs b/influxdb_influxql_parser/src/keywords.rs index 104eba4a8d..75fafabe49 100644 --- a/influxdb_influxql_parser/src/keywords.rs +++ b/influxdb_influxql_parser/src/keywords.rs @@ -25,6 +25,7 @@ fn keyword_follow_char(i: &str) -> ParseResult<&str, &str> { tag("\t"), tag(","), tag("="), + tag("!"), // possible != tag("/"), // possible comment tag("-"), // possible comment eof, @@ -277,6 +278,13 @@ mod test { // Will fail because keyword `OR` in `ORDER` is not recognized, as is not terminated by a valid character let err = or_keyword("ORDER").unwrap_err(); assert_matches!(err, nom::Err::Error(crate::internal::Error::Nom(_, kind)) if kind == nom::error::ErrorKind::Fail); + + // test valid follow-on characters + let mut tag_keyword = keyword("TAG"); + + let (rem, got) = tag_keyword("tag!").unwrap(); + assert_eq!(rem, "!"); + assert_eq!(got, "tag"); } #[test] diff --git a/iox_query/src/plan/influxql/field.rs b/iox_query/src/plan/influxql/field.rs index c8df0a58de..d579cda212 100644 --- a/iox_query/src/plan/influxql/field.rs +++ b/iox_query/src/plan/influxql/field.rs @@ -1,6 +1,7 @@ use influxdb_influxql_parser::expression::Expr; use influxdb_influxql_parser::select::{Field, SelectStatement}; use influxdb_influxql_parser::visit::{Recursion, Visitable, Visitor, VisitorResult}; +use std::ops::Deref; /// Returns the name of the field. /// @@ -14,15 +15,17 @@ pub(crate) fn field_name(f: &Field) -> String { return alias.to_string(); } - match &f.expr { - Expr::Call { name, .. } => name.clone(), - Expr::Nested(nested) => field_name(&Field { - expr: *nested.clone(), - alias: None, - }), - Expr::Binary { .. } => binary_expr_name(&f.expr), - Expr::VarRef { name, .. } => name.to_string(), - _ => "".to_string(), + let mut expr = &f.expr; + loop { + expr = match expr { + Expr::Call { name, .. } => return name.clone(), + Expr::Nested(nested) => nested, + Expr::Binary { .. } => return binary_expr_name(&f.expr), + Expr::UnaryOp(_, nested) => nested, + Expr::Distinct(_) => return "distinct".to_string(), + Expr::VarRef { name, .. } => return name.deref().into(), + Expr::Wildcard(_) | Expr::BindParameter(_) | Expr::Literal(_) => return "".to_string(), + }; } } @@ -103,6 +106,22 @@ mod test { let f = get_first_field("SELECT 1+2 FROM cpu"); assert_eq!(field_name(&f), ""); + + let f = get_first_field("SELECT 1 + usage FROM cpu"); + assert_eq!(field_name(&f), "usage"); + + let f = get_first_field("SELECT /reg/ FROM cpu"); + assert_eq!(field_name(&f), ""); + + let f = get_first_field("SELECT DISTINCT usage FROM cpu"); + assert_eq!(field_name(&f), "distinct"); + + let f = get_first_field("SELECT -usage FROM cpu"); + assert_eq!(field_name(&f), "usage"); + + // Doesn't quote keyword + let f = get_first_field("SELECT \"user\" FROM cpu"); + assert_eq!(field_name(&f), "user"); } #[test] diff --git a/iox_query/src/plan/influxql/rewriter.rs b/iox_query/src/plan/influxql/rewriter.rs index afdc6fdf50..55d48e655c 100644 --- a/iox_query/src/plan/influxql/rewriter.rs +++ b/iox_query/src/plan/influxql/rewriter.rs @@ -18,7 +18,7 @@ use influxdb_influxql_parser::visit::{Recursion, Visitable, Visitor, VisitorResu use itertools::Itertools; use query_functions::clean_non_meta_escapes; use std::borrow::Borrow; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::ops::Deref; use std::sync::Arc; @@ -210,7 +210,7 @@ fn walk_expr_mut(expr: &mut Expr, visit: &mut impl FnMut(&mut Expr) -> Result<() } /// Perform a depth-first traversal of the expression tree. -fn walk_expr(expr: &Expr, visit: &mut impl FnMut(&Expr) -> Result<()>) -> Result<()> { +pub(crate) fn walk_expr(expr: &Expr, visit: &mut impl FnMut(&Expr) -> Result<()>) -> Result<()> { match expr { Expr::Binary { lhs, rhs, .. } => { walk_expr(lhs, visit)?; @@ -488,6 +488,42 @@ fn rewrite_field_list(stmt: &mut SelectStatement, schema: Arc Result<()> { + let names = field_list.iter().map(field_name).collect::>(); + let mut column_aliases = HashMap::<&str, _>::from_iter(names.iter().map(|f| (f.as_str(), 0))); + names + .iter() + .zip(field_list.iter_mut()) + .for_each(|(name, field)| { + // Generate a new name if there is an existing alias + field.alias = Some(match column_aliases.get(name.as_str()) { + Some(0) => { + column_aliases.insert(name, 1); + name.as_str().into() + } + Some(count) => { + let mut count = *count; + loop { + let resolved_name = format!("{}_{}", name, count); + if column_aliases.contains_key(resolved_name.as_str()) { + count += 1; + } else { + column_aliases.insert(name, count + 1); + break resolved_name.as_str().into(); + } + } + } + None => unreachable!(), + }) + }); + + Ok(()) +} + /// Recursively rewrite the specified [`SelectStatement`], expanding any wildcards or regular expressions /// found in the projection list, `FROM` clause or `GROUP BY` clause. pub(crate) fn rewrite_statement( @@ -497,6 +533,7 @@ pub(crate) fn rewrite_statement( let mut stmt = q.clone(); rewrite_from(&mut stmt, Arc::clone(&schema))?; rewrite_field_list(&mut stmt, schema)?; + rewrite_field_list_aliases(&mut stmt.fields)?; Ok(stmt) } @@ -525,7 +562,32 @@ mod test { // Exact, match let stmt = parse_select("SELECT usage_user FROM cpu"); let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); - assert_eq!(stmt.to_string(), "SELECT usage_user::float FROM cpu"); + assert_eq!( + stmt.to_string(), + "SELECT usage_user::float AS usage_user FROM cpu" + ); + + // Duplicate columns do not have conflicting aliases + let stmt = parse_select("SELECT usage_user, usage_user FROM cpu"); + let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + assert_eq!( + stmt.to_string(), + "SELECT usage_user::float AS usage_user, usage_user::float AS usage_user_1 FROM cpu" + ); + + // Multiple aliases with no conflicts + let stmt = parse_select("SELECT usage_user as usage_user_1, usage_user FROM cpu"); + let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + assert_eq!( + stmt.to_string(), + "SELECT usage_user::float AS usage_user_1, usage_user::float AS usage_user FROM cpu" + ); + + // Multiple aliases with conflicts + let stmt = + parse_select("SELECT usage_user as usage_user_1, usage_user, usage_user, usage_user as usage_user_2, usage_user, usage_user_2 FROM cpu"); + let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + assert_eq!(stmt.to_string(), "SELECT usage_user::float AS usage_user_1, usage_user::float AS usage_user, usage_user::float AS usage_user_3, usage_user::float AS usage_user_2, usage_user::float AS usage_user_4, usage_user_2 AS usage_user_2_1 FROM cpu"); // Rewriting FROM clause @@ -534,7 +596,7 @@ mod test { let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT bytes_free::integer FROM disk, diskio" + "SELECT bytes_free::integer AS bytes_free FROM disk, diskio" ); // Exact, no match @@ -554,14 +616,14 @@ mod test { let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT host::tag, region::tag, usage_idle::float, usage_system::float, usage_user::float FROM cpu" + "SELECT host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu" ); let stmt = parse_select("SELECT * FROM cpu, disk"); let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT bytes_free::integer, bytes_used::integer, host::tag, region::tag, usage_idle::float, usage_system::float, usage_user::float FROM cpu, disk" + "SELECT bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk" ); // Regular expression selects fields from multiple measurements @@ -569,20 +631,23 @@ mod test { let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT bytes_free::integer, bytes_used::integer, usage_idle::float, usage_system::float, usage_user::float FROM cpu, disk" + "SELECT bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk" ); // Selective wildcard for tags let stmt = parse_select("SELECT *::tag FROM cpu"); let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); - assert_eq!(stmt.to_string(), "SELECT host::tag, region::tag FROM cpu"); + assert_eq!( + stmt.to_string(), + "SELECT host::tag AS host, region::tag AS region FROM cpu" + ); // Selective wildcard for fields let stmt = parse_select("SELECT *::field FROM cpu"); let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT usage_idle::float, usage_system::float, usage_user::float FROM cpu" + "SELECT usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu" ); // Mixed fields and wildcards @@ -590,7 +655,7 @@ mod test { let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT usage_idle::float, host::tag, region::tag FROM cpu" + "SELECT usage_idle::float AS usage_idle, host::tag AS host, region::tag AS region FROM cpu" ); // GROUP BY expansion @@ -599,14 +664,14 @@ mod test { let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT usage_idle::float FROM cpu GROUP BY host" + "SELECT usage_idle::float AS usage_idle FROM cpu GROUP BY host" ); let stmt = parse_select("SELECT usage_idle FROM cpu GROUP BY *"); let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT usage_idle::float FROM cpu GROUP BY host, region" + "SELECT usage_idle::float AS usage_idle FROM cpu GROUP BY host, region" ); // Fallible @@ -623,7 +688,7 @@ mod test { let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT usage_idle::float FROM (SELECT usage_idle::float FROM cpu)" + "SELECT usage_idle::float AS usage_idle FROM (SELECT usage_idle::float FROM cpu)" ); // Subquery, regex, match @@ -631,7 +696,7 @@ mod test { let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT bytes_free::integer FROM (SELECT bytes_free::integer FROM disk, diskio)" + "SELECT bytes_free::integer AS bytes_free FROM (SELECT bytes_free::integer FROM disk, diskio)" ); // Subquery, exact, no match @@ -639,7 +704,7 @@ mod test { let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT usage_idle FROM (SELECT usage_idle )" + "SELECT usage_idle AS usage_idle FROM (SELECT usage_idle )" ); // Subquery, regex, no match @@ -647,7 +712,7 @@ mod test { let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT bytes_free FROM (SELECT bytes_free )" + "SELECT bytes_free AS bytes_free FROM (SELECT bytes_free )" ); // Binary expression @@ -655,7 +720,15 @@ mod test { let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT bytes_free::integer + bytes_used::integer FROM disk" + "SELECT bytes_free::integer + bytes_used::integer AS bytes_free_bytes_used FROM disk" + ); + + // Unary expressions + let stmt = parse_select("SELECT -bytes_free FROM disk"); + let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + assert_eq!( + stmt.to_string(), + "SELECT -bytes_free::integer AS bytes_free FROM disk" ); // Call expressions @@ -664,14 +737,22 @@ mod test { let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT COUNT(field_i64::integer) FROM temp_01" + "SELECT COUNT(field_i64::integer) AS COUNT FROM temp_01" + ); + + // Duplicate aggregate columns + let stmt = parse_select("SELECT COUNT(field_i64), COUNT(field_i64) FROM temp_01"); + let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + assert_eq!( + stmt.to_string(), + "SELECT COUNT(field_i64::integer) AS COUNT, COUNT(field_i64::integer) AS COUNT_1 FROM temp_01" ); let stmt = parse_select("SELECT COUNT(field_f64) FROM temp_01"); let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); assert_eq!( stmt.to_string(), - "SELECT COUNT(field_f64::float) FROM temp_01" + "SELECT COUNT(field_f64::float) AS COUNT FROM temp_01" ); // Expands all fields diff --git a/iox_query/src/plan/influxql/test_utils.rs b/iox_query/src/plan/influxql/test_utils.rs index 706ebcaad2..85e14575d2 100644 --- a/iox_query/src/plan/influxql/test_utils.rs +++ b/iox_query/src/plan/influxql/test_utils.rs @@ -74,6 +74,54 @@ impl MockSchemaProvider { pub(crate) fn new_schema_provider() -> Arc { Arc::new(Self {}) } + + /// Return the chunks that make up the test database. + pub(crate) fn table_chunks() -> Vec { + vec![ + TestChunk::new("cpu") + .with_quiet() + .with_tag_column("host") + .with_tag_column("region") + .with_f64_field_column("usage_user") + .with_f64_field_column("usage_system") + .with_f64_field_column("usage_idle"), + TestChunk::new("disk") + .with_quiet() + .with_tag_column("host") + .with_tag_column("region") + .with_i64_field_column("bytes_used") + .with_i64_field_column("bytes_free"), + TestChunk::new("diskio") + .with_quiet() + .with_tag_column("host") + .with_tag_column("region") + .with_tag_column("status") + .with_i64_field_column("bytes_read") + .with_i64_field_column("bytes_written") + .with_f64_field_column("read_utilization") + .with_f64_field_column("write_utilization") + .with_bool_field_column("is_local"), + // Schemas for testing merged schemas + TestChunk::new("temp_01") + .with_quiet() + .with_tag_column("shared_tag0") + .with_tag_column("shared_tag1") + .with_f64_field_column("shared_field0") + .with_f64_field_column("field_f64") + .with_i64_field_column("field_i64") + .with_string_field_column_with_stats("field_str", None, None), + TestChunk::new("temp_02") + .with_quiet() + .with_tag_column("shared_tag0") + .with_tag_column("shared_tag1") + .with_i64_field_column("shared_field0"), + TestChunk::new("temp_03") + .with_quiet() + .with_tag_column("shared_tag0") + .with_tag_column("shared_tag1") + .with_string_field_column_with_stats("shared_field0", None, None), + ] + } } impl SchemaProvider for MockSchemaProvider { @@ -82,77 +130,18 @@ impl SchemaProvider for MockSchemaProvider { } fn table_names(&self) -> Vec { - vec![ - "cpu".into(), - "disk".into(), - "diskio".into(), - "temp_01".into(), - "temp_03".into(), - "temp_03".into(), - ] - .into_iter() - .sorted() - .collect::>() + Self::table_chunks() + .iter() + .map(|c| c.table_name().into()) + .sorted() + .collect::>() } fn table(&self, name: &str) -> Option> { - let schema = match name { - "cpu" => Some( - TestChunk::new("cpu") - .with_tag_column("host") - .with_tag_column("region") - .with_f64_field_column("usage_user") - .with_f64_field_column("usage_system") - .with_f64_field_column("usage_idle") - .schema(), - ), - "disk" => Some( - TestChunk::new("disk") - .with_tag_column("host") - .with_tag_column("region") - .with_i64_field_column("bytes_used") - .with_i64_field_column("bytes_free") - .schema(), - ), - "diskio" => Some( - TestChunk::new("diskio") - .with_tag_column("host") - .with_tag_column("region") - .with_tag_column("status") - .with_i64_field_column("bytes_read") - .with_i64_field_column("bytes_written") - .with_f64_field_column("read_utilization") - .with_f64_field_column("write_utilization") - .with_bool_field_column("is_local") - .schema(), - ), - // Schemas for testing merged schemas - "temp_01" => Some( - TestChunk::new("temp_01") - .with_tag_column("shared_tag0") - .with_tag_column("shared_tag1") - .with_f64_field_column("shared_field0") - .with_f64_field_column("field_f64") - .with_i64_field_column("field_i64") - .with_string_field_column_with_stats("field_str", None, None) - .schema(), - ), - "temp_02" => Some( - TestChunk::new("temp_02") - .with_tag_column("shared_tag0") - .with_tag_column("shared_tag1") - .with_i64_field_column("shared_field0") - .schema(), - ), - "temp_03" => Some( - TestChunk::new("temp_03") - .with_tag_column("shared_tag0") - .with_tag_column("shared_tag1") - .with_string_field_column_with_stats("shared_field0", None, None) - .schema(), - ), - _ => None, - }; + let schema = Self::table_chunks() + .iter() + .find(|c| c.table_name() == name) + .map(|c| c.schema()); match schema { Some(s) => Some(Arc::new(EmptyTable::new(Arc::clone(s.inner())))), diff --git a/iox_query/src/plan/influxql/var_ref.rs b/iox_query/src/plan/influxql/var_ref.rs index d9a68f9c4b..c6669450a0 100644 --- a/iox_query/src/plan/influxql/var_ref.rs +++ b/iox_query/src/plan/influxql/var_ref.rs @@ -10,7 +10,7 @@ pub(crate) fn var_ref_data_type_to_field_type(v: VarRefDataType) -> Option Some(InfluxFieldType::Float), VarRefDataType::String => Some(InfluxFieldType::String), VarRefDataType::Boolean => Some(InfluxFieldType::Boolean), - VarRefDataType::Tag | VarRefDataType::Field => None, + VarRefDataType::Tag | VarRefDataType::Field | VarRefDataType::Timestamp => None, } } @@ -54,6 +54,7 @@ mod test { ); assert!(var_ref_data_type_to_field_type(VarRefDataType::Field).is_none()); assert!(var_ref_data_type_to_field_type(VarRefDataType::Tag).is_none()); + assert!(var_ref_data_type_to_field_type(VarRefDataType::Timestamp).is_none()); } #[test] diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index 670224f334..7ae4e56ad1 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -223,6 +223,9 @@ pub struct TestChunk { /// The partition sort key of this chunk partition_sort_key: Option, + + /// Suppress output + quiet: bool, } /// Implements a method for adding a column with default stats @@ -304,9 +307,16 @@ impl TestChunk { sort_key: None, partition_sort_key: None, partition_id: PartitionId::new(0), + quiet: false, } } + /// Returns the receiver configured to suppress any output to STDOUT. + pub fn with_quiet(mut self) -> Self { + self.quiet = true; + self + } + pub fn with_id(mut self, id: u128) -> Self { self.id = ChunkId::new_test(id); self @@ -618,7 +628,9 @@ impl TestChunk { let batch = RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch"); - println!("TestChunk batch data: {:#?}", batch); + if !self.quiet { + println!("TestChunk batch data: {:#?}", batch); + } self.table_data.push(Arc::new(batch)); self @@ -656,7 +668,9 @@ impl TestChunk { let batch = RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch"); - println!("TestChunk batch data: {:#?}", batch); + if !self.quiet { + println!("TestChunk batch data: {:#?}", batch); + } self.table_data.push(Arc::new(batch)); self