feat: Teach InfluxQL rewriter how to name columns (#6481)

* feat: Add timestamp data type

* feat: Add with_quiet API to suppress output to STDOUT

* fix: Field name resolution to match InfluxQL

* refactor: Allow TestChunks to be directly accessed

This will be useful when testing the InfluxQL planner.

* fix: Add Timestamp case to var_ref module

* feat: Add InfluxQL compatible column naming

* chore: Add doc comment.

* fix: keywords may be followed by a `!` such as `!=`

* fix: field_name improvements

* No longer clones expressions
* Explicitly handle all Expr enumerated items
* more tests

* fix: collision with explicitly aliased column

Fixes case where column is explicitly aliased to an auto-named variant.
Test case added to validate.
pull/24376/head
Stuart Carnie 2023-01-04 11:55:18 +11:00 committed by GitHub
parent dbe52f1ca1
commit aacd91db94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 214 additions and 99 deletions

View File

@ -191,6 +191,8 @@ pub enum VarRefDataType {
Field,
/// Represents a tag.
Tag,
/// Represents a timestamp.
Timestamp,
}
impl VarRefDataType {
@ -215,6 +217,7 @@ impl Display for VarRefDataType {
Self::Boolean => f.write_str("boolean"),
Self::Tag => f.write_str("tag"),
Self::Field => f.write_str("field"),
Self::Timestamp => f.write_str("timestamp"),
}
}
}

View File

@ -25,6 +25,7 @@ fn keyword_follow_char(i: &str) -> ParseResult<&str, &str> {
tag("\t"),
tag(","),
tag("="),
tag("!"), // possible !=
tag("/"), // possible comment
tag("-"), // possible comment
eof,
@ -277,6 +278,13 @@ mod test {
// Will fail because keyword `OR` in `ORDER` is not recognized, as is not terminated by a valid character
let err = or_keyword("ORDER").unwrap_err();
assert_matches!(err, nom::Err::Error(crate::internal::Error::Nom(_, kind)) if kind == nom::error::ErrorKind::Fail);
// test valid follow-on characters
let mut tag_keyword = keyword("TAG");
let (rem, got) = tag_keyword("tag!").unwrap();
assert_eq!(rem, "!");
assert_eq!(got, "tag");
}
#[test]

View File

@ -1,6 +1,7 @@
use influxdb_influxql_parser::expression::Expr;
use influxdb_influxql_parser::select::{Field, SelectStatement};
use influxdb_influxql_parser::visit::{Recursion, Visitable, Visitor, VisitorResult};
use std::ops::Deref;
/// Returns the name of the field.
///
@ -14,15 +15,17 @@ pub(crate) fn field_name(f: &Field) -> String {
return alias.to_string();
}
match &f.expr {
Expr::Call { name, .. } => name.clone(),
Expr::Nested(nested) => field_name(&Field {
expr: *nested.clone(),
alias: None,
}),
Expr::Binary { .. } => binary_expr_name(&f.expr),
Expr::VarRef { name, .. } => name.to_string(),
_ => "".to_string(),
let mut expr = &f.expr;
loop {
expr = match expr {
Expr::Call { name, .. } => return name.clone(),
Expr::Nested(nested) => nested,
Expr::Binary { .. } => return binary_expr_name(&f.expr),
Expr::UnaryOp(_, nested) => nested,
Expr::Distinct(_) => return "distinct".to_string(),
Expr::VarRef { name, .. } => return name.deref().into(),
Expr::Wildcard(_) | Expr::BindParameter(_) | Expr::Literal(_) => return "".to_string(),
};
}
}
@ -103,6 +106,22 @@ mod test {
let f = get_first_field("SELECT 1+2 FROM cpu");
assert_eq!(field_name(&f), "");
let f = get_first_field("SELECT 1 + usage FROM cpu");
assert_eq!(field_name(&f), "usage");
let f = get_first_field("SELECT /reg/ FROM cpu");
assert_eq!(field_name(&f), "");
let f = get_first_field("SELECT DISTINCT usage FROM cpu");
assert_eq!(field_name(&f), "distinct");
let f = get_first_field("SELECT -usage FROM cpu");
assert_eq!(field_name(&f), "usage");
// Doesn't quote keyword
let f = get_first_field("SELECT \"user\" FROM cpu");
assert_eq!(field_name(&f), "user");
}
#[test]

View File

@ -18,7 +18,7 @@ use influxdb_influxql_parser::visit::{Recursion, Visitable, Visitor, VisitorResu
use itertools::Itertools;
use query_functions::clean_non_meta_escapes;
use std::borrow::Borrow;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::sync::Arc;
@ -210,7 +210,7 @@ fn walk_expr_mut(expr: &mut Expr, visit: &mut impl FnMut(&mut Expr) -> Result<()
}
/// Perform a depth-first traversal of the expression tree.
fn walk_expr(expr: &Expr, visit: &mut impl FnMut(&Expr) -> Result<()>) -> Result<()> {
pub(crate) fn walk_expr(expr: &Expr, visit: &mut impl FnMut(&Expr) -> Result<()>) -> Result<()> {
match expr {
Expr::Binary { lhs, rhs, .. } => {
walk_expr(lhs, visit)?;
@ -488,6 +488,42 @@ fn rewrite_field_list(stmt: &mut SelectStatement, schema: Arc<dyn SchemaProvider
Ok(())
}
/// Resolve the outer-most `SELECT` projection list column names in accordance with the
/// [original implementation]. The names are assigned to the `alias` field of the [`Field`] struct.
///
/// [original implementation]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L1651
fn rewrite_field_list_aliases(field_list: &mut FieldList) -> Result<()> {
let names = field_list.iter().map(field_name).collect::<Vec<_>>();
let mut column_aliases = HashMap::<&str, _>::from_iter(names.iter().map(|f| (f.as_str(), 0)));
names
.iter()
.zip(field_list.iter_mut())
.for_each(|(name, field)| {
// Generate a new name if there is an existing alias
field.alias = Some(match column_aliases.get(name.as_str()) {
Some(0) => {
column_aliases.insert(name, 1);
name.as_str().into()
}
Some(count) => {
let mut count = *count;
loop {
let resolved_name = format!("{}_{}", name, count);
if column_aliases.contains_key(resolved_name.as_str()) {
count += 1;
} else {
column_aliases.insert(name, count + 1);
break resolved_name.as_str().into();
}
}
}
None => unreachable!(),
})
});
Ok(())
}
/// Recursively rewrite the specified [`SelectStatement`], expanding any wildcards or regular expressions
/// found in the projection list, `FROM` clause or `GROUP BY` clause.
pub(crate) fn rewrite_statement(
@ -497,6 +533,7 @@ pub(crate) fn rewrite_statement(
let mut stmt = q.clone();
rewrite_from(&mut stmt, Arc::clone(&schema))?;
rewrite_field_list(&mut stmt, schema)?;
rewrite_field_list_aliases(&mut stmt.fields)?;
Ok(stmt)
}
@ -525,7 +562,32 @@ mod test {
// Exact, match
let stmt = parse_select("SELECT usage_user FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(stmt.to_string(), "SELECT usage_user::float FROM cpu");
assert_eq!(
stmt.to_string(),
"SELECT usage_user::float AS usage_user FROM cpu"
);
// Duplicate columns do not have conflicting aliases
let stmt = parse_select("SELECT usage_user, usage_user FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_user::float AS usage_user, usage_user::float AS usage_user_1 FROM cpu"
);
// Multiple aliases with no conflicts
let stmt = parse_select("SELECT usage_user as usage_user_1, usage_user FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_user::float AS usage_user_1, usage_user::float AS usage_user FROM cpu"
);
// Multiple aliases with conflicts
let stmt =
parse_select("SELECT usage_user as usage_user_1, usage_user, usage_user, usage_user as usage_user_2, usage_user, usage_user_2 FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(stmt.to_string(), "SELECT usage_user::float AS usage_user_1, usage_user::float AS usage_user, usage_user::float AS usage_user_3, usage_user::float AS usage_user_2, usage_user::float AS usage_user_4, usage_user_2 AS usage_user_2_1 FROM cpu");
// Rewriting FROM clause
@ -534,7 +596,7 @@ mod test {
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer FROM disk, diskio"
"SELECT bytes_free::integer AS bytes_free FROM disk, diskio"
);
// Exact, no match
@ -554,14 +616,14 @@ mod test {
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT host::tag, region::tag, usage_idle::float, usage_system::float, usage_user::float FROM cpu"
"SELECT host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu"
);
let stmt = parse_select("SELECT * FROM cpu, disk");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer, bytes_used::integer, host::tag, region::tag, usage_idle::float, usage_system::float, usage_user::float FROM cpu, disk"
"SELECT bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk"
);
// Regular expression selects fields from multiple measurements
@ -569,20 +631,23 @@ mod test {
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer, bytes_used::integer, usage_idle::float, usage_system::float, usage_user::float FROM cpu, disk"
"SELECT bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk"
);
// Selective wildcard for tags
let stmt = parse_select("SELECT *::tag FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(stmt.to_string(), "SELECT host::tag, region::tag FROM cpu");
assert_eq!(
stmt.to_string(),
"SELECT host::tag AS host, region::tag AS region FROM cpu"
);
// Selective wildcard for fields
let stmt = parse_select("SELECT *::field FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float, usage_system::float, usage_user::float FROM cpu"
"SELECT usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu"
);
// Mixed fields and wildcards
@ -590,7 +655,7 @@ mod test {
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float, host::tag, region::tag FROM cpu"
"SELECT usage_idle::float AS usage_idle, host::tag AS host, region::tag AS region FROM cpu"
);
// GROUP BY expansion
@ -599,14 +664,14 @@ mod test {
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float FROM cpu GROUP BY host"
"SELECT usage_idle::float AS usage_idle FROM cpu GROUP BY host"
);
let stmt = parse_select("SELECT usage_idle FROM cpu GROUP BY *");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float FROM cpu GROUP BY host, region"
"SELECT usage_idle::float AS usage_idle FROM cpu GROUP BY host, region"
);
// Fallible
@ -623,7 +688,7 @@ mod test {
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float FROM (SELECT usage_idle::float FROM cpu)"
"SELECT usage_idle::float AS usage_idle FROM (SELECT usage_idle::float FROM cpu)"
);
// Subquery, regex, match
@ -631,7 +696,7 @@ mod test {
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer FROM (SELECT bytes_free::integer FROM disk, diskio)"
"SELECT bytes_free::integer AS bytes_free FROM (SELECT bytes_free::integer FROM disk, diskio)"
);
// Subquery, exact, no match
@ -639,7 +704,7 @@ mod test {
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle FROM (SELECT usage_idle )"
"SELECT usage_idle AS usage_idle FROM (SELECT usage_idle )"
);
// Subquery, regex, no match
@ -647,7 +712,7 @@ mod test {
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free FROM (SELECT bytes_free )"
"SELECT bytes_free AS bytes_free FROM (SELECT bytes_free )"
);
// Binary expression
@ -655,7 +720,15 @@ mod test {
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer + bytes_used::integer FROM disk"
"SELECT bytes_free::integer + bytes_used::integer AS bytes_free_bytes_used FROM disk"
);
// Unary expressions
let stmt = parse_select("SELECT -bytes_free FROM disk");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT -bytes_free::integer AS bytes_free FROM disk"
);
// Call expressions
@ -664,14 +737,22 @@ mod test {
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT COUNT(field_i64::integer) FROM temp_01"
"SELECT COUNT(field_i64::integer) AS COUNT FROM temp_01"
);
// Duplicate aggregate columns
let stmt = parse_select("SELECT COUNT(field_i64), COUNT(field_i64) FROM temp_01");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT COUNT(field_i64::integer) AS COUNT, COUNT(field_i64::integer) AS COUNT_1 FROM temp_01"
);
let stmt = parse_select("SELECT COUNT(field_f64) FROM temp_01");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT COUNT(field_f64::float) FROM temp_01"
"SELECT COUNT(field_f64::float) AS COUNT FROM temp_01"
);
// Expands all fields

View File

@ -74,6 +74,54 @@ impl MockSchemaProvider {
pub(crate) fn new_schema_provider() -> Arc<dyn SchemaProvider> {
Arc::new(Self {})
}
/// Return the chunks that make up the test database.
pub(crate) fn table_chunks() -> Vec<TestChunk> {
vec![
TestChunk::new("cpu")
.with_quiet()
.with_tag_column("host")
.with_tag_column("region")
.with_f64_field_column("usage_user")
.with_f64_field_column("usage_system")
.with_f64_field_column("usage_idle"),
TestChunk::new("disk")
.with_quiet()
.with_tag_column("host")
.with_tag_column("region")
.with_i64_field_column("bytes_used")
.with_i64_field_column("bytes_free"),
TestChunk::new("diskio")
.with_quiet()
.with_tag_column("host")
.with_tag_column("region")
.with_tag_column("status")
.with_i64_field_column("bytes_read")
.with_i64_field_column("bytes_written")
.with_f64_field_column("read_utilization")
.with_f64_field_column("write_utilization")
.with_bool_field_column("is_local"),
// Schemas for testing merged schemas
TestChunk::new("temp_01")
.with_quiet()
.with_tag_column("shared_tag0")
.with_tag_column("shared_tag1")
.with_f64_field_column("shared_field0")
.with_f64_field_column("field_f64")
.with_i64_field_column("field_i64")
.with_string_field_column_with_stats("field_str", None, None),
TestChunk::new("temp_02")
.with_quiet()
.with_tag_column("shared_tag0")
.with_tag_column("shared_tag1")
.with_i64_field_column("shared_field0"),
TestChunk::new("temp_03")
.with_quiet()
.with_tag_column("shared_tag0")
.with_tag_column("shared_tag1")
.with_string_field_column_with_stats("shared_field0", None, None),
]
}
}
impl SchemaProvider for MockSchemaProvider {
@ -82,77 +130,18 @@ impl SchemaProvider for MockSchemaProvider {
}
fn table_names(&self) -> Vec<String> {
vec![
"cpu".into(),
"disk".into(),
"diskio".into(),
"temp_01".into(),
"temp_03".into(),
"temp_03".into(),
]
.into_iter()
.sorted()
.collect::<Vec<_>>()
Self::table_chunks()
.iter()
.map(|c| c.table_name().into())
.sorted()
.collect::<Vec<_>>()
}
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let schema = match name {
"cpu" => Some(
TestChunk::new("cpu")
.with_tag_column("host")
.with_tag_column("region")
.with_f64_field_column("usage_user")
.with_f64_field_column("usage_system")
.with_f64_field_column("usage_idle")
.schema(),
),
"disk" => Some(
TestChunk::new("disk")
.with_tag_column("host")
.with_tag_column("region")
.with_i64_field_column("bytes_used")
.with_i64_field_column("bytes_free")
.schema(),
),
"diskio" => Some(
TestChunk::new("diskio")
.with_tag_column("host")
.with_tag_column("region")
.with_tag_column("status")
.with_i64_field_column("bytes_read")
.with_i64_field_column("bytes_written")
.with_f64_field_column("read_utilization")
.with_f64_field_column("write_utilization")
.with_bool_field_column("is_local")
.schema(),
),
// Schemas for testing merged schemas
"temp_01" => Some(
TestChunk::new("temp_01")
.with_tag_column("shared_tag0")
.with_tag_column("shared_tag1")
.with_f64_field_column("shared_field0")
.with_f64_field_column("field_f64")
.with_i64_field_column("field_i64")
.with_string_field_column_with_stats("field_str", None, None)
.schema(),
),
"temp_02" => Some(
TestChunk::new("temp_02")
.with_tag_column("shared_tag0")
.with_tag_column("shared_tag1")
.with_i64_field_column("shared_field0")
.schema(),
),
"temp_03" => Some(
TestChunk::new("temp_03")
.with_tag_column("shared_tag0")
.with_tag_column("shared_tag1")
.with_string_field_column_with_stats("shared_field0", None, None)
.schema(),
),
_ => None,
};
let schema = Self::table_chunks()
.iter()
.find(|c| c.table_name() == name)
.map(|c| c.schema());
match schema {
Some(s) => Some(Arc::new(EmptyTable::new(Arc::clone(s.inner())))),

View File

@ -10,7 +10,7 @@ pub(crate) fn var_ref_data_type_to_field_type(v: VarRefDataType) -> Option<Influ
VarRefDataType::Float => Some(InfluxFieldType::Float),
VarRefDataType::String => Some(InfluxFieldType::String),
VarRefDataType::Boolean => Some(InfluxFieldType::Boolean),
VarRefDataType::Tag | VarRefDataType::Field => None,
VarRefDataType::Tag | VarRefDataType::Field | VarRefDataType::Timestamp => None,
}
}
@ -54,6 +54,7 @@ mod test {
);
assert!(var_ref_data_type_to_field_type(VarRefDataType::Field).is_none());
assert!(var_ref_data_type_to_field_type(VarRefDataType::Tag).is_none());
assert!(var_ref_data_type_to_field_type(VarRefDataType::Timestamp).is_none());
}
#[test]

View File

@ -223,6 +223,9 @@ pub struct TestChunk {
/// The partition sort key of this chunk
partition_sort_key: Option<SortKey>,
/// Suppress output
quiet: bool,
}
/// Implements a method for adding a column with default stats
@ -304,9 +307,16 @@ impl TestChunk {
sort_key: None,
partition_sort_key: None,
partition_id: PartitionId::new(0),
quiet: false,
}
}
/// Returns the receiver configured to suppress any output to STDOUT.
pub fn with_quiet(mut self) -> Self {
self.quiet = true;
self
}
pub fn with_id(mut self, id: u128) -> Self {
self.id = ChunkId::new_test(id);
self
@ -618,7 +628,9 @@ impl TestChunk {
let batch =
RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch");
println!("TestChunk batch data: {:#?}", batch);
if !self.quiet {
println!("TestChunk batch data: {:#?}", batch);
}
self.table_data.push(Arc::new(batch));
self
@ -656,7 +668,9 @@ impl TestChunk {
let batch =
RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch");
println!("TestChunk batch data: {:#?}", batch);
if !self.quiet {
println!("TestChunk batch data: {:#?}", batch);
}
self.table_data.push(Arc::new(batch));
self