feat: InfluxQL planner learns how to normalise InfluxQL AST (#6236)

* chore: Move logic to context, in line with DataFusion SQL

* chore: Add ordering for InfluxQL data types

Ordering is used to determine automatic casting operations. If two
field columns are present in an expression, one float and one integer,
the integer should be cast to a float, such that the final expression
will be a float.

* chore: Add DerefMut trait to collection types

Will allow these collections to be mutated when traversing the InfluxQL
AST.

* chore: Add influxql module with initial AST normalisation implementation

* chore: Add more unit tests and docs

* chore: Run cargo hakari tasks

* chore: Fix link

* chore: Support regular expression expansion and Call expressions

* chore: Add tests for walk_expr functions

* chore: Add insta snapshot files

* chore: Add docs and make API accessible to the crate

* chore: Move to Arc<dyn SchemaProvider> for use in influxql planner

* chore: Move code back; it is better encapsulated here

* chore: Remove redundant attribute

* chore: Improve regex compatibility with InfluxQL / Go

* chore: Style improvement.

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
pull/24376/head
Stuart Carnie 2023-01-03 10:48:21 +11:00 committed by GitHub
parent 0aacef3c59
commit 4add55d39e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1767 additions and 6 deletions

2
Cargo.lock generated
View File

@ -2706,6 +2706,7 @@ dependencies = [
"futures",
"hashbrown 0.13.1",
"influxdb_influxql_parser",
"insta",
"itertools",
"object_store",
"observability_deps",
@ -2713,6 +2714,7 @@ dependencies = [
"parquet_file",
"predicate",
"query_functions",
"regex",
"schema",
"snafu",
"test_helpers",

View File

@ -435,6 +435,12 @@ impl<T> Deref for OneOrMore<T> {
}
}
impl<T> DerefMut for OneOrMore<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.contents
}
}
impl<T: Parser> OneOrMore<T> {
/// Parse a list of one or more `T`, separated by commas.
///
@ -499,6 +505,12 @@ impl<T> Deref for ZeroOrMore<T> {
}
}
impl<T> DerefMut for ZeroOrMore<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.contents
}
}
impl<T: Parser> ZeroOrMore<T> {
/// Parse a list of one or more `T`, separated by commas.
///

View File

@ -175,20 +175,34 @@ impl Display for WildcardType {
/// InfluxQL only supports casting between [`Self::Float`] and [`Self::Integer`] types.
///
/// [cast]: https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#cast-operations
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum VarRefDataType {
/// Represents a 64-bit float.
Float,
/// Represents a 64-bit integer.
Integer,
/// Represents a 64-bit unsigned integer.
Unsigned,
/// Represents a UTF-8 string.
String,
/// Represents a boolean.
Boolean,
/// Represents a tag.
Tag,
/// Represents a field.
Field,
/// Represents a tag.
Tag,
}
impl VarRefDataType {
/// Returns true if the receiver is a data type that identifies as a field type.
pub fn is_field_type(&self) -> bool {
*self < Self::Tag
}
/// Returns true if the receiver is a data type that identifies as a tag type.
pub fn is_tag_type(&self) -> bool {
*self == Self::Tag
}
}
impl Display for VarRefDataType {
@ -196,6 +210,7 @@ impl Display for VarRefDataType {
match self {
Self::Float => f.write_str("float"),
Self::Integer => f.write_str("integer"),
Self::Unsigned => f.write_str("unsigned"),
Self::String => f.write_str("string"),
Self::Boolean => f.write_str("boolean"),
Self::Tag => f.write_str("tag"),
@ -781,4 +796,34 @@ mod test {
"foo::field"
);
}
#[test]
fn test_var_ref_data_type() {
use VarRefDataType::*;
// Ensure ordering of data types relative to one another.
assert!(Float < Integer);
assert!(Integer < Unsigned);
assert!(Unsigned < String);
assert!(String < Boolean);
assert!(Boolean < Field);
assert!(Field < Tag);
assert!(Float.is_field_type());
assert!(Integer.is_field_type());
assert!(Unsigned.is_field_type());
assert!(String.is_field_type());
assert!(Boolean.is_field_type());
assert!(Field.is_field_type());
assert!(Tag.is_tag_type());
assert!(!Float.is_tag_type());
assert!(!Integer.is_tag_type());
assert!(!Unsigned.is_tag_type());
assert!(!String.is_tag_type());
assert!(!Boolean.is_tag_type());
assert!(!Field.is_tag_type());
assert!(!Tag.is_field_type());
}
}

View File

@ -32,6 +32,7 @@ observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
parquet_file = { path = "../parquet_file" }
query_functions = { path = "../query_functions"}
regex = "1"
schema = { path = "../schema" }
snafu = "0.7"
tokio = { version = "1.22", features = ["macros", "parking_lot"] }
@ -43,3 +44,4 @@ workspace-hack = { path = "../workspace-hack"}
[dev-dependencies] # In alphabetical order
test_helpers = { path = "../test_helpers" }
assert_matches = "1"
insta = { version = "1", features = ["yaml"] }

View File

@ -2,9 +2,13 @@ use std::sync::Arc;
use crate::exec::context::IOxSessionContext;
use crate::plan::influxql::InfluxQLToLogicalPlan;
use crate::{debug, DataFusionError, QueryNamespace};
use datafusion::{error::Result, physical_plan::ExecutionPlan};
use crate::QueryNamespace;
use datafusion::{
error::{DataFusionError, Result},
physical_plan::ExecutionPlan,
};
use influxdb_influxql_parser::parse_statements;
use observability_deps::tracing::debug;
/// This struct can create plans for running SQL queries against databases
#[derive(Debug, Default)]
@ -31,7 +35,7 @@ impl InfluxQLQueryPlanner {
if statements.len() != 1 {
return Err(DataFusionError::NotImplemented(
"The context currently only supports a single SQL statement".to_string(),
"The context currently only supports a single InfluxQL statement".to_string(),
));
}

View File

@ -1,3 +1,10 @@
mod expr_type_evaluator;
mod field;
mod field_mapper;
mod rewriter;
mod test_utils;
mod var_ref;
use crate::{DataFusionError, IOxSessionContext, QueryNamespace};
use datafusion::common::Result;
use datafusion::execution::context::SessionState;

View File

@ -0,0 +1,276 @@
use crate::plan::influxql::field::field_by_name;
use crate::plan::influxql::field_mapper::FieldMapper;
use datafusion::common::{DataFusionError, Result};
use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName};
use influxdb_influxql_parser::expression::{Expr, UnaryOperator, VarRefDataType};
use influxdb_influxql_parser::literal::Literal;
use influxdb_influxql_parser::select::{Dimension, FromMeasurementClause, MeasurementSelection};
use itertools::Itertools;
/// Evaluate the type of the specified expression.
///
/// Derived from [Go implementation](https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4796-L4797).
pub(crate) fn evaluate_type(
expr: &Expr,
from: &FromMeasurementClause,
fm: &dyn FieldMapper,
) -> Result<Option<VarRefDataType>> {
TypeEvaluator::new(from, fm).eval_type(expr)
}
struct TypeEvaluator<'a> {
fm: &'a dyn FieldMapper,
from: &'a FromMeasurementClause,
}
impl<'a> TypeEvaluator<'a> {
fn new(from: &'a FromMeasurementClause, fm: &'a dyn FieldMapper) -> Self {
Self { from, fm }
}
fn eval_type(&self, expr: &Expr) -> Result<Option<VarRefDataType>> {
Ok(match expr {
Expr::VarRef { name, data_type } => self.eval_var_ref(name.as_str(), data_type)?,
Expr::Call { name, args } => self.eval_call(name.as_str(), args)?,
// NOTE: This is a deviation from https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4635,
// as we'll let DataFusion determine the column type and if the types are compatible.
Expr::Binary { .. } => None,
Expr::Nested(expr) => self.eval_type(expr)?,
Expr::Literal(Literal::Float(_)) => Some(VarRefDataType::Float),
Expr::Literal(Literal::Unsigned(_)) => Some(VarRefDataType::Unsigned),
Expr::Literal(Literal::String(_)) => Some(VarRefDataType::String),
Expr::Literal(Literal::Boolean(_)) => Some(VarRefDataType::Boolean),
Expr::UnaryOp(op, expr) => match (op, self.eval_type(expr)?) {
(UnaryOperator::Minus, Some(VarRefDataType::Unsigned)) => {
Some(VarRefDataType::Integer)
}
(_, Some(ft)) => Some(ft),
(_, None) => None,
},
// Remaining patterns are not valid field types
Expr::BindParameter(_)
| Expr::Distinct(_)
| Expr::Wildcard(_)
| Expr::Literal(Literal::Duration(_))
| Expr::Literal(Literal::Regex(_)) => None,
})
}
/// Returns the type for the specified [`Expr`].
/// This function assumes that the expression has already been reduced.
fn eval_var_ref(
&self,
name: &str,
data_type: &Option<VarRefDataType>,
) -> Result<Option<VarRefDataType>> {
Ok(match data_type {
Some(dt)
if matches!(
dt,
VarRefDataType::Integer
| VarRefDataType::Float
| VarRefDataType::String
| VarRefDataType::Boolean
) =>
{
Some(*dt)
}
_ => {
let mut data_type: Option<VarRefDataType> = None;
for ms in self.from.iter() {
match ms {
MeasurementSelection::Name(QualifiedMeasurementName {
name: MeasurementName::Name(ident),
..
}) => match (data_type, self.fm.map_type(ident.as_str(), name)?) {
(Some(existing), Some(res)) => {
if res < existing {
data_type = Some(res)
}
}
(None, Some(res)) => data_type = Some(res),
_ => continue,
},
MeasurementSelection::Subquery(select) => {
// find the field by name
if let Some(field) = field_by_name(select, name) {
match (
data_type,
evaluate_type(&field.expr, &select.from, self.fm)?,
) {
(Some(existing), Some(res)) => {
if res < existing {
data_type = Some(res)
}
}
(None, Some(res)) => data_type = Some(res),
_ => {}
}
};
if data_type.is_none() {
if let Some(group_by) = &select.group_by {
if group_by.iter().any(|dim| {
matches!(dim, Dimension::Tag(ident) if ident.as_str() == name)
}) {
data_type = Some(VarRefDataType::Tag);
}
}
}
}
_ => {
return Err(DataFusionError::Internal(
"eval_var_ref: Unexpected MeasurementSelection".to_string(),
))
}
}
}
data_type
}
})
}
/// Evaluate the datatype of the function identified by `name`.
///
/// Derived from [Go implementation](https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4693)
/// and [here](https://github.com/influxdata/influxdb/blob/37088e8f5330bec0f08a376b2cb945d02a296f4e/influxql/query/functions.go#L50).
fn eval_call(&self, name: &str, args: &[Expr]) -> Result<Option<VarRefDataType>> {
// Evaluate the data types of the arguments
let arg_types: Vec<_> = args.iter().map(|expr| self.eval_type(expr)).try_collect()?;
Ok(match name.to_ascii_lowercase().as_str() {
"mean" => Some(VarRefDataType::Float),
"count" => Some(VarRefDataType::Integer),
"min" | "max" | "sum" | "first" | "last" => match arg_types.first() {
Some(v) => *v,
None => None,
},
_ => None,
})
}
}
#[cfg(test)]
mod test {
use crate::plan::influxql::expr_type_evaluator::evaluate_type;
use crate::plan::influxql::field_mapper::{FieldMapper, SchemaFieldMapper};
use crate::plan::influxql::test_utils::{parse_select, MockSchemaProvider};
use assert_matches::assert_matches;
use influxdb_influxql_parser::expression::VarRefDataType;
#[test]
fn test_evaluate_type() {
let fm =
&SchemaFieldMapper::new(MockSchemaProvider::new_schema_provider()) as &dyn FieldMapper;
let stmt = parse_select("SELECT shared_field0 FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Float);
let stmt = parse_select("SELECT shared_tag0 FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Tag);
// Unknown
let stmt = parse_select("SELECT not_exists FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap();
assert!(res.is_none());
let stmt = parse_select("SELECT shared_field0 FROM temp_02");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Integer);
let stmt = parse_select("SELECT shared_field0 FROM temp_02");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Integer);
// Same field across multiple measurements resolves to the highest precedence (float)
let stmt = parse_select("SELECT shared_field0 FROM temp_01, temp_02");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Float);
// Explicit cast of integer field to float
let stmt = parse_select("SELECT SUM(field_i64::float) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Float);
// data types for functions
let stmt = parse_select("SELECT SUM(field_f64) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Float);
let stmt = parse_select("SELECT SUM(field_i64) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Integer);
let stmt = parse_select("SELECT MIN(field_f64) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Float);
let stmt = parse_select("SELECT MAX(field_i64) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Integer);
let stmt = parse_select("SELECT FIRST(field_str) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::String);
let stmt = parse_select("SELECT LAST(field_str) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::String);
let stmt = parse_select("SELECT MEAN(field_i64) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Float);
let stmt = parse_select("SELECT COUNT(field_f64) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Integer);
let stmt = parse_select("SELECT COUNT(field_i64) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Integer);
let stmt = parse_select("SELECT COUNT(field_str) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Integer);
// subqueries
let stmt = parse_select("SELECT inner FROM (SELECT field_f64 as inner FROM temp_01)");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Float);
let stmt =
parse_select("SELECT inner FROM (SELECT shared_tag0, field_f64 as inner FROM temp_01)");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Float);
let stmt = parse_select(
"SELECT shared_tag0, inner FROM (SELECT shared_tag0, field_f64 as inner FROM temp_01)",
);
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
assert_matches!(res, VarRefDataType::Tag);
}
}

View File

@ -0,0 +1,155 @@
use influxdb_influxql_parser::expression::Expr;
use influxdb_influxql_parser::select::{Field, SelectStatement};
use influxdb_influxql_parser::visit::{Recursion, Visitable, Visitor, VisitorResult};
/// Returns the name of the field.
///
/// Prefers the alias if set, otherwise derives the name
/// from [Expr::VarRef] or [Expr::Call]. Finally, if neither
/// are available, falls back to an empty string.
///
/// Derived from [Go implementation](https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L3326-L3328)
pub(crate) fn field_name(f: &Field) -> String {
if let Some(alias) = &f.alias {
return alias.to_string();
}
match &f.expr {
Expr::Call { name, .. } => name.clone(),
Expr::Nested(nested) => field_name(&Field {
expr: *nested.clone(),
alias: None,
}),
Expr::Binary { .. } => binary_expr_name(&f.expr),
Expr::VarRef { name, .. } => name.to_string(),
_ => "".to_string(),
}
}
/// Returns the expression that matches the field name.
///
/// If the name matches one of the arguments to
/// "top" or "bottom", the variable reference inside of the function is returned.
///
/// Derive from [this implementation](https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L1725)
///
/// **NOTE**
///
/// This implementation duplicates the behavior of the original implementation, including skipping the
/// first argument. It is likely the original intended to skip the _last_ argument, which is the number
/// of rows.
pub(crate) fn field_by_name(select: &SelectStatement, name: &str) -> Option<Field> {
select.fields
.iter()
.find(|f| {
field_name(f) == name || match &f.expr {
Expr::Call { name: func_name, args } if (func_name.eq_ignore_ascii_case("top")
|| func_name.eq_ignore_ascii_case("bottom"))
&& args.len() > 2 =>
args[1..].iter().any(|f| matches!(f, Expr::VarRef { name: field_name, .. } if field_name.as_str() == name)),
_ => false,
}
})
.cloned()
}
struct BinaryExprNameVisitor<'a>(&'a mut Vec<String>);
impl<'a> Visitor for BinaryExprNameVisitor<'a> {
fn pre_visit_expr(self, n: &Expr) -> VisitorResult<Recursion<Self>> {
match n {
Expr::Call { name, .. } => self.0.push(name.clone()),
Expr::VarRef { name, .. } => self.0.push(name.to_string()),
_ => {}
};
Ok(Recursion::Continue(self))
}
}
/// Returns the name of a binary expression by concatenating
/// the names of any [Expr::VarRef] and [Expr::Call] with underscores.
///
/// Derived from [Go implementation](https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L3729-L3731)
fn binary_expr_name(expr: &Expr) -> String {
let mut names = Vec::new();
let vis = BinaryExprNameVisitor(&mut names);
expr.accept(vis).unwrap(); // It is not expected to fail
names.join("_")
}
#[cfg(test)]
mod test {
use crate::plan::influxql::field::{field_by_name, field_name};
use crate::plan::influxql::test_utils::{get_first_field, parse_select};
use assert_matches::assert_matches;
#[test]
fn test_field_name() {
let f = get_first_field("SELECT usage FROM cpu");
assert_eq!(field_name(&f), "usage");
let f = get_first_field("SELECT usage as u2 FROM cpu");
assert_eq!(field_name(&f), "u2");
let f = get_first_field("SELECT (usage) FROM cpu");
assert_eq!(field_name(&f), "usage");
let f = get_first_field("SELECT COUNT(usage) FROM cpu");
assert_eq!(field_name(&f), "COUNT");
let f = get_first_field("SELECT COUNT(usage) + SUM(usage_idle) FROM cpu");
assert_eq!(field_name(&f), "COUNT_usage_SUM_usage_idle");
let f = get_first_field("SELECT 1+2 FROM cpu");
assert_eq!(field_name(&f), "");
}
#[test]
fn test_field_by_name() {
let stmt = parse_select("SELECT usage, idle FROM cpu");
assert_eq!(
format!("{}", field_by_name(&stmt, "usage").unwrap()),
"usage"
);
let stmt = parse_select("SELECT usage as foo, usage FROM cpu");
assert_eq!(
format!("{}", field_by_name(&stmt, "foo").unwrap()),
"usage AS foo"
);
let stmt = parse_select("SELECT top(idle, usage, 5), usage FROM cpu");
assert_eq!(
format!("{}", field_by_name(&stmt, "usage").unwrap()),
"top(idle, usage, 5)"
);
let stmt = parse_select("SELECT bottom(idle, usage, 5), usage FROM cpu");
assert_eq!(
format!("{}", field_by_name(&stmt, "usage").unwrap()),
"bottom(idle, usage, 5)"
);
let stmt = parse_select("SELECT top(idle, usage, 5) as foo, usage FROM cpu");
assert_eq!(
format!("{}", field_by_name(&stmt, "usage").unwrap()),
"top(idle, usage, 5) AS foo"
);
assert_eq!(
format!("{}", field_by_name(&stmt, "foo").unwrap()),
"top(idle, usage, 5) AS foo"
);
// Not exists
let stmt = parse_select("SELECT usage, idle FROM cpu");
assert_matches!(field_by_name(&stmt, "bar"), None);
// Does not match name by first argument to top or bottom, per
// bug in original implementation.
let stmt = parse_select("SELECT top(foo, usage, 5), idle FROM cpu");
assert_matches!(field_by_name(&stmt, "foo"), None);
assert_eq!(format!("{}", field_by_name(&stmt, "idle").unwrap()), "idle");
}
}

View File

@ -0,0 +1,126 @@
#![allow(dead_code)]
use crate::plan::influxql::var_ref::field_type_to_var_ref_data_type;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::common::{DataFusionError, Result};
use influxdb_influxql_parser::expression::VarRefDataType;
use schema::{InfluxColumnType, Schema};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
pub(crate) type FieldTypeMap = HashMap<String, VarRefDataType>;
pub(crate) type TagSet = HashSet<String>;
/// Represents an InfluxQL schema for determining the fields and tags
/// of a measurement.
pub(crate) trait FieldMapper {
/// Determine the fields and tags for the given measurement name.
fn field_and_dimensions(&self, name: &str) -> Result<Option<(FieldTypeMap, TagSet)>>;
/// Determine the [`VarRefDataType`] for the given field name.
fn map_type(&self, name: &str, field: &str) -> Result<Option<VarRefDataType>>;
}
pub(crate) struct SchemaFieldMapper {
schema: Arc<dyn SchemaProvider>,
}
impl SchemaFieldMapper {
pub(crate) fn new(schema: Arc<dyn SchemaProvider>) -> Self {
Self { schema }
}
fn get_schema(&self, name: &str) -> Result<Option<Schema>> {
Ok(Some(
Schema::try_from(match self.schema.table(name) {
Some(t) => t.schema(),
None => return Ok(None),
})
.map_err(|e| {
DataFusionError::Internal(format!("Unable to create IOx schema: {}", e))
})?,
))
}
}
impl FieldMapper for SchemaFieldMapper {
fn field_and_dimensions(&self, name: &str) -> Result<Option<(FieldTypeMap, TagSet)>> {
match self.get_schema(name)? {
Some(iox) => Ok(Some((
FieldTypeMap::from_iter(iox.iter().filter_map(|(col_type, f)| match col_type {
InfluxColumnType::Field(ft) => {
Some((f.name().clone(), field_type_to_var_ref_data_type(ft)))
}
_ => None,
})),
iox.tags_iter()
.map(|f| f.name().clone())
.collect::<TagSet>(),
))),
None => Ok(None),
}
}
fn map_type(&self, measurement_name: &str, field: &str) -> Result<Option<VarRefDataType>> {
match self.get_schema(measurement_name)? {
Some(iox) => Ok(match iox.find_index_of(field) {
Some(i) => match iox.field(i).0 {
InfluxColumnType::Field(ft) => Some(field_type_to_var_ref_data_type(ft)),
InfluxColumnType::Tag => Some(VarRefDataType::Tag),
InfluxColumnType::Timestamp => None,
},
None => None,
}),
None => Ok(None),
}
}
}
#[cfg(test)]
mod test {
use crate::plan::influxql::field_mapper::{
FieldMapper, FieldTypeMap, SchemaFieldMapper, TagSet,
};
use crate::plan::influxql::test_utils::MockSchemaProvider;
use assert_matches::assert_matches;
use influxdb_influxql_parser::expression::VarRefDataType;
#[test]
fn test_schema_field_mapper() {
let fm =
&SchemaFieldMapper::new(MockSchemaProvider::new_schema_provider()) as &dyn FieldMapper;
// Measurement exists
let (field_set, tag_set) = fm.field_and_dimensions("cpu").unwrap().unwrap();
assert_eq!(
field_set,
FieldTypeMap::from([
("usage_user".to_string(), VarRefDataType::Float),
("usage_system".to_string(), VarRefDataType::Float),
("usage_idle".to_string(), VarRefDataType::Float),
])
);
assert_eq!(
tag_set,
TagSet::from(["host".to_string(), "region".to_string()])
);
// Measurement does not exist
assert!(fm.field_and_dimensions("cpu2").unwrap().is_none());
// `map_type` API calls
// Returns expected type
assert_matches!(
fm.map_type("cpu", "usage_user").unwrap(),
Some(VarRefDataType::Float)
);
assert_matches!(
fm.map_type("cpu", "host").unwrap(),
Some(VarRefDataType::Tag)
);
// Returns None for nonexistent field
assert!(fm.map_type("cpu", "nonexistent").unwrap().is_none());
// Returns None for nonexistent measurement
assert!(fm.map_type("nonexistent", "usage").unwrap().is_none());
}
}

View File

@ -0,0 +1,828 @@
#![allow(dead_code)]
use crate::plan::influxql::expr_type_evaluator::evaluate_type;
use crate::plan::influxql::field::field_name;
use crate::plan::influxql::field_mapper::{FieldMapper, FieldTypeMap, SchemaFieldMapper, TagSet};
use datafusion::catalog::schema::SchemaProvider;
use datafusion::common::{DataFusionError, Result};
use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName};
use influxdb_influxql_parser::expression::{Expr, VarRefDataType, WildcardType};
use influxdb_influxql_parser::identifier::Identifier;
use influxdb_influxql_parser::literal::Literal;
use influxdb_influxql_parser::select::{
Dimension, Field, FieldList, FromMeasurementClause, GroupByClause, MeasurementSelection,
SelectStatement,
};
use influxdb_influxql_parser::string::Regex;
use influxdb_influxql_parser::visit::{Recursion, Visitable, Visitor, VisitorResult};
use itertools::Itertools;
use query_functions::clean_non_meta_escapes;
use std::borrow::Borrow;
use std::collections::HashSet;
use std::ops::Deref;
use std::sync::Arc;
fn parse_regex(re: &Regex) -> Result<regex::Regex> {
let pattern = clean_non_meta_escapes(re.as_str());
regex::Regex::new(&pattern).map_err(|e| {
DataFusionError::External(format!("invalid regular expression '{}': {}", re, e).into())
})
}
/// Recursively expand the `from` clause of `stmt` and any subqueries.
fn rewrite_from(stmt: &mut SelectStatement, schema: Arc<dyn SchemaProvider>) -> Result<()> {
let mut new_from = Vec::new();
let schema = &Arc::clone(&schema);
for ms in stmt.from.iter() {
match ms {
MeasurementSelection::Name(qmn) => match qmn {
QualifiedMeasurementName {
name: MeasurementName::Name(name),
..
} => {
if schema.table(name).is_some() {
new_from.push(ms.clone())
}
}
QualifiedMeasurementName {
name: MeasurementName::Regex(re),
..
} => {
let re = parse_regex(re)?;
schema
.table_names()
.into_iter()
.filter(|table| re.is_match(table.as_str()))
.for_each(|table| {
new_from.push(MeasurementSelection::Name(QualifiedMeasurementName {
database: None,
retention_policy: None,
name: MeasurementName::Name(table.into()),
}))
});
}
},
MeasurementSelection::Subquery(q) => {
let mut q = *q.clone();
rewrite_from(&mut q, Arc::clone(schema))?;
new_from.push(MeasurementSelection::Subquery(Box::new(q)))
}
}
}
stmt.from = FromMeasurementClause::new(new_from);
Ok(())
}
/// Determine the merged fields and tags of the `FROM` clause.
fn from_field_and_dimensions(
from: &FromMeasurementClause,
schema: Arc<dyn SchemaProvider>,
) -> Result<(FieldTypeMap, TagSet)> {
let mut fs = FieldTypeMap::new();
let mut ts = TagSet::new();
let fm = &SchemaFieldMapper::new(schema) as &dyn FieldMapper;
for ms in from.deref() {
match ms {
MeasurementSelection::Name(QualifiedMeasurementName {
name: MeasurementName::Name(name),
..
}) => {
let (field_set, tag_set) = match fm.field_and_dimensions(name.as_str())? {
Some(res) => res,
None => continue,
};
// Merge field_set with existing
for (name, ft) in &field_set {
match fs.get(name) {
Some(existing_type) => {
if ft < existing_type {
fs.insert(name.to_string(), *ft);
}
}
None => {
fs.insert(name.to_string(), *ft);
}
};
}
ts.extend(tag_set);
}
MeasurementSelection::Subquery(select) => {
for f in select.fields.iter() {
let dt = match evaluate_type(&f.expr, &select.from, fm)? {
Some(dt) => dt,
None => continue,
};
let name = field_name(f);
match fs.get(name.as_str()) {
Some(existing_type) => {
if dt < *existing_type {
fs.insert(name, dt);
}
}
None => {
fs.insert(name, dt);
}
}
}
if let Some(group_by) = &select.group_by {
// Merge the dimensions from the subquery
ts.extend(group_by.iter().filter_map(|d| match d {
Dimension::Tag(ident) => Some(ident.to_string()),
_ => None,
}));
}
}
_ => {
// Unreachable, as the from clause should be normalised at this point.
return Err(DataFusionError::Internal(
"Unexpected MeasurementSelection in from".to_string(),
));
}
}
}
Ok((fs, ts))
}
/// Returns a tuple indicating whether the specifies `SELECT` statement
/// has any wildcards or regular expressions in the projection list
/// and `GROUP BY` clause respectively.
fn has_wildcards(stmt: &SelectStatement) -> (bool, bool) {
struct HasWildcardsVisitor(bool, bool);
impl Visitor for HasWildcardsVisitor {
fn pre_visit_expr(self, n: &Expr) -> VisitorResult<Recursion<Self>> {
Ok(
if matches!(n, Expr::Wildcard(_) | Expr::Literal(Literal::Regex(_))) {
Recursion::Stop(Self(true, self.1))
} else {
Recursion::Continue(self)
},
)
}
fn pre_visit_select_from_clause(
self,
_n: &FromMeasurementClause,
) -> VisitorResult<Recursion<Self>> {
// Don't traverse FROM and potential subqueries
Ok(Recursion::Stop(self))
}
fn pre_visit_select_dimension(self, n: &Dimension) -> VisitorResult<Recursion<Self>> {
Ok(if matches!(n, Dimension::Wildcard | Dimension::Regex(_)) {
Recursion::Stop(Self(self.0, true))
} else {
Recursion::Continue(self)
})
}
}
let res = Visitable::accept(stmt, HasWildcardsVisitor(false, false)).unwrap();
(res.0, res.1)
}
/// Perform a depth-first traversal of the expression tree.
fn walk_expr_mut(expr: &mut Expr, visit: &mut impl FnMut(&mut Expr) -> Result<()>) -> Result<()> {
match expr {
Expr::Binary { lhs, rhs, .. } => {
walk_expr_mut(lhs, visit)?;
walk_expr_mut(rhs, visit)?;
}
Expr::UnaryOp(_, expr) => walk_expr_mut(expr, visit)?,
Expr::Nested(expr) => walk_expr_mut(expr, visit)?,
Expr::Call { args, .. } => {
args.iter_mut().try_for_each(|n| walk_expr_mut(n, visit))?;
}
Expr::VarRef { .. }
| Expr::BindParameter(_)
| Expr::Literal(_)
| Expr::Wildcard(_)
| Expr::Distinct(_) => {}
}
visit(expr)
}
/// Perform a depth-first traversal of the expression tree.
fn walk_expr(expr: &Expr, visit: &mut impl FnMut(&Expr) -> Result<()>) -> Result<()> {
match expr {
Expr::Binary { lhs, rhs, .. } => {
walk_expr(lhs, visit)?;
walk_expr(rhs, visit)?;
}
Expr::UnaryOp(_, expr) => walk_expr(expr, visit)?,
Expr::Nested(expr) => walk_expr(expr, visit)?,
Expr::Call { args, .. } => {
args.iter().try_for_each(|n| walk_expr(n, visit))?;
}
Expr::VarRef { .. }
| Expr::BindParameter(_)
| Expr::Literal(_)
| Expr::Wildcard(_)
| Expr::Distinct(_) => {}
}
visit(expr)
}
/// Rewrite the projection list and GROUP BY of the specified `SELECT` statement.
///
/// Wildcards and regular expressions in the `SELECT` projection list and `GROUP BY` are expanded.
/// Any fields with no type specifier are rewritten with the appropriate type, if they exist in the
/// underlying schema.
///
/// Derived from [Go implementation](https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L1185).
fn rewrite_field_list(stmt: &mut SelectStatement, schema: Arc<dyn SchemaProvider>) -> Result<()> {
// Iterate through the `FROM` clause and rewrite any subqueries first.
for ms in stmt.from.iter_mut() {
if let MeasurementSelection::Subquery(subquery) = ms {
rewrite_field_list(subquery, Arc::clone(&schema))?;
}
}
// Attempt to rewrite all variable references in the fields with their types, if one
// hasn't been specified.
let fm = &SchemaFieldMapper::new(Arc::clone(&schema)) as &dyn FieldMapper;
stmt.fields.iter_mut().try_for_each(|f| {
walk_expr_mut(&mut f.expr, &mut |e| {
if matches!(e, Expr::VarRef { .. }) {
let new_type = evaluate_type(e.borrow(), &stmt.from, fm)?;
if let Expr::VarRef { data_type, .. } = e {
*data_type = new_type;
}
}
Ok(())
})
})?;
let (has_field_wildcard, has_group_by_wildcard) = has_wildcards(stmt);
if (has_field_wildcard, has_group_by_wildcard) == (false, false) {
return Ok(());
}
let (field_set, mut tag_set) = from_field_and_dimensions(&stmt.from, Arc::clone(&schema))?;
if !has_group_by_wildcard {
if let Some(group_by) = &stmt.group_by {
// Remove any explicitly listed tags in the GROUP BY clause, so they are not expanded
// in the wildcard specified in the SELECT projection list
group_by.iter().for_each(|dim| {
if let Dimension::Tag(ident) = dim {
tag_set.remove(ident.as_str());
}
});
}
}
#[derive(PartialEq, PartialOrd, Eq, Ord)]
struct VarRef {
name: String,
data_type: VarRefDataType,
}
let fields = if !field_set.is_empty() {
let fields_iter = field_set.iter().map(|(k, v)| VarRef {
name: k.clone(),
data_type: *v,
});
if !has_group_by_wildcard {
fields_iter
.chain(tag_set.iter().map(|tag| VarRef {
name: tag.clone(),
data_type: VarRefDataType::Tag,
}))
.sorted()
.collect::<Vec<_>>()
} else {
fields_iter.sorted().collect::<Vec<_>>()
}
} else {
vec![]
};
if has_field_wildcard {
let mut new_fields = Vec::new();
for f in stmt.fields.iter() {
let add_field = |f: &VarRef| {
new_fields.push(Field {
expr: Expr::VarRef {
name: f.name.clone().into(),
data_type: Some(f.data_type),
},
alias: None,
})
};
match &f.expr {
Expr::Wildcard(wct) => {
let filter: fn(&&VarRef) -> bool = match wct {
None => |_| true,
Some(WildcardType::Tag) => |v| v.data_type.is_tag_type(),
Some(WildcardType::Field) => |v| v.data_type.is_field_type(),
};
fields.iter().filter(filter).for_each(add_field);
}
Expr::Literal(Literal::Regex(re)) => {
let re = parse_regex(re)?;
fields
.iter()
.filter(|v| re.is_match(v.name.as_str()))
.for_each(add_field);
}
Expr::Call { name, args } => {
let mut name = name;
let mut args = args;
// Search for the call with a wildcard by continuously descending until
// we no longer have a call.
while let Some(Expr::Call {
name: inner_name,
args: inner_args,
}) = args.first()
{
name = inner_name;
args = inner_args;
}
let mut supported_types = HashSet::from([
VarRefDataType::Float,
VarRefDataType::Integer,
VarRefDataType::Unsigned,
]);
// Add additional types for certain functions.
match name.to_lowercase().as_str() {
"count" | "first" | "last" | "distinct" | "elapsed" | "mode" | "sample" => {
supported_types
.extend([VarRefDataType::String, VarRefDataType::Boolean]);
}
"min" | "max" => {
supported_types.insert(VarRefDataType::Boolean);
}
"holt_winters" | "holt_winters_with_fit" => {
supported_types.remove(&VarRefDataType::Unsigned);
}
_ => {}
}
let add_field = |v: &VarRef| {
let mut args = args.clone();
args[0] = Expr::VarRef {
name: v.name.clone().into(),
data_type: Some(v.data_type),
};
new_fields.push(Field {
expr: Expr::Call {
name: name.clone(),
args,
},
alias: Some(format!("{}_{}", field_name(f), v.name).into()),
})
};
match args.first() {
Some(Expr::Wildcard(Some(WildcardType::Tag))) => {
return Err(DataFusionError::External(
format!("unable to use tag as wildcard in {}()", name).into(),
))
}
Some(Expr::Wildcard(_)) => {
fields
.iter()
.filter(|v| supported_types.contains(&v.data_type))
.for_each(add_field);
}
Some(Expr::Literal(Literal::Regex(re))) => {
let re = parse_regex(re)?;
fields
.iter()
.filter(|v| {
supported_types.contains(&v.data_type)
&& re.is_match(v.name.as_str())
})
.for_each(add_field);
}
_ => {
new_fields.push(f.clone());
continue;
}
}
}
Expr::Binary { .. } => {
let mut has_wildcard = false;
walk_expr(&f.expr, &mut |e| {
match e {
Expr::Wildcard(_) | Expr::Literal(Literal::Regex(_)) => {
has_wildcard = true
}
_ => {}
}
Ok(())
})?;
if has_wildcard {
return Err(DataFusionError::External(
"unsupported expression: contains a wildcard or regular expression"
.into(),
));
}
new_fields.push(f.clone());
}
_ => new_fields.push(f.clone()),
}
}
stmt.fields = FieldList::new(new_fields);
}
if has_group_by_wildcard {
let group_by_tags = if has_group_by_wildcard {
tag_set.into_iter().sorted().collect::<Vec<_>>()
} else {
vec![]
};
if let Some(group_by) = &stmt.group_by {
let mut new_dimensions = Vec::new();
for dim in group_by.iter() {
let add_dim = |dim: &String| {
new_dimensions.push(Dimension::Tag(Identifier::new(dim.clone())))
};
match dim {
Dimension::Wildcard => {
group_by_tags.iter().for_each(add_dim);
}
Dimension::Regex(re) => {
let re = parse_regex(re)?;
group_by_tags
.iter()
.filter(|dim| re.is_match(dim.as_str()))
.for_each(add_dim);
}
_ => new_dimensions.push(dim.clone()),
}
}
stmt.group_by = Some(GroupByClause::new(new_dimensions));
}
}
Ok(())
}
/// Recursively rewrite the specified [`SelectStatement`], expanding any wildcards or regular expressions
/// found in the projection list, `FROM` clause or `GROUP BY` clause.
pub(crate) fn rewrite_statement(
q: &SelectStatement,
schema: Arc<dyn SchemaProvider>,
) -> Result<SelectStatement> {
let mut stmt = q.clone();
rewrite_from(&mut stmt, Arc::clone(&schema))?;
rewrite_field_list(&mut stmt, schema)?;
Ok(stmt)
}
#[cfg(test)]
mod test {
use crate::plan::influxql::rewriter::{has_wildcards, rewrite_statement, walk_expr_mut};
use crate::plan::influxql::test_utils::{get_first_field, MockSchemaProvider};
use influxdb_influxql_parser::expression::Expr;
use influxdb_influxql_parser::literal::Literal;
use influxdb_influxql_parser::parse_statements;
use influxdb_influxql_parser::select::SelectStatement;
use influxdb_influxql_parser::statement::Statement;
use test_helpers::assert_contains;
fn parse_select(s: &str) -> SelectStatement {
let statements = parse_statements(s).unwrap();
match statements.first() {
Some(Statement::Select(sel)) => *sel.clone(),
_ => unreachable!(),
}
}
#[test]
fn test_rewrite_statement() {
// Exact, match
let stmt = parse_select("SELECT usage_user FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(stmt.to_string(), "SELECT usage_user::float FROM cpu");
// Rewriting FROM clause
// Regex, match
let stmt = parse_select("SELECT bytes_free FROM /d/");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer FROM disk, diskio"
);
// Exact, no match
let stmt = parse_select("SELECT usage_idle FROM foo");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert!(stmt.from.is_empty());
// Regex, no match
let stmt = parse_select("SELECT bytes_free FROM /^d$/");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert!(stmt.from.is_empty());
// Rewriting projection list
// Single wildcard, single measurement
let stmt = parse_select("SELECT * FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT host::tag, region::tag, usage_idle::float, usage_system::float, usage_user::float FROM cpu"
);
let stmt = parse_select("SELECT * FROM cpu, disk");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer, bytes_used::integer, host::tag, region::tag, usage_idle::float, usage_system::float, usage_user::float FROM cpu, disk"
);
// Regular expression selects fields from multiple measurements
let stmt = parse_select("SELECT /usage|bytes/ FROM cpu, disk");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer, bytes_used::integer, usage_idle::float, usage_system::float, usage_user::float FROM cpu, disk"
);
// Selective wildcard for tags
let stmt = parse_select("SELECT *::tag FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(stmt.to_string(), "SELECT host::tag, region::tag FROM cpu");
// Selective wildcard for fields
let stmt = parse_select("SELECT *::field FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float, usage_system::float, usage_user::float FROM cpu"
);
// Mixed fields and wildcards
let stmt = parse_select("SELECT usage_idle, *::tag FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float, host::tag, region::tag FROM cpu"
);
// GROUP BY expansion
let stmt = parse_select("SELECT usage_idle FROM cpu GROUP BY host");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float FROM cpu GROUP BY host"
);
let stmt = parse_select("SELECT usage_idle FROM cpu GROUP BY *");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float FROM cpu GROUP BY host, region"
);
// Fallible
// Invalid regex
let stmt = parse_select("SELECT usage_idle FROM /(not/");
let err = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap_err();
assert_contains!(err.to_string(), "invalid regular expression");
// Subqueries
// Subquery, exact, match
let stmt = parse_select("SELECT usage_idle FROM (SELECT usage_idle FROM cpu)");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float FROM (SELECT usage_idle::float FROM cpu)"
);
// Subquery, regex, match
let stmt = parse_select("SELECT bytes_free FROM (SELECT bytes_free FROM /d/)");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer FROM (SELECT bytes_free::integer FROM disk, diskio)"
);
// Subquery, exact, no match
let stmt = parse_select("SELECT usage_idle FROM (SELECT usage_idle FROM foo)");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle FROM (SELECT usage_idle )"
);
// Subquery, regex, no match
let stmt = parse_select("SELECT bytes_free FROM (SELECT bytes_free FROM /^d$/)");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free FROM (SELECT bytes_free )"
);
// Binary expression
let stmt = parse_select("SELECT bytes_free+bytes_used FROM disk");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer + bytes_used::integer FROM disk"
);
// Call expressions
let stmt = parse_select("SELECT COUNT(field_i64) FROM temp_01");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT COUNT(field_i64::integer) FROM temp_01"
);
let stmt = parse_select("SELECT COUNT(field_f64) FROM temp_01");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT COUNT(field_f64::float) FROM temp_01"
);
// Expands all fields
let stmt = parse_select("SELECT COUNT(*) FROM temp_01");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT COUNT(field_f64::float) AS COUNT_field_f64, COUNT(field_i64::integer) AS COUNT_field_i64, COUNT(field_str::string) AS COUNT_field_str, COUNT(shared_field0::float) AS COUNT_shared_field0 FROM temp_01"
);
// Expands matching fields
let stmt = parse_select("SELECT COUNT(/64$/) FROM temp_01");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT COUNT(field_f64::float) AS COUNT_field_f64, COUNT(field_i64::integer) AS COUNT_field_i64 FROM temp_01"
);
// Expands only numeric fields
let stmt = parse_select("SELECT SUM(*) FROM temp_01");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT SUM(field_f64::float) AS SUM_field_f64, SUM(field_i64::integer) AS SUM_field_i64, SUM(shared_field0::float) AS SUM_shared_field0 FROM temp_01"
);
// Fallible cases
let stmt = parse_select("SELECT *::field + *::tag FROM cpu");
let err = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap_err();
assert_eq!(
err.to_string(),
"External error: unsupported expression: contains a wildcard or regular expression"
);
let stmt = parse_select("SELECT COUNT(*::tag) FROM cpu");
let err = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap_err();
assert_eq!(
err.to_string(),
"External error: unable to use tag as wildcard in COUNT()"
);
}
#[test]
fn test_has_wildcards() {
// no GROUP BY
let sel = parse_select("select a from b");
let res = has_wildcards(&sel);
assert!(!res.0);
assert!(!res.1);
let sel = parse_select("select a from b group by c");
let res = has_wildcards(&sel);
assert!(!res.0);
assert!(!res.1);
let sel = parse_select("select * from b group by c");
let res = has_wildcards(&sel);
assert!(res.0);
assert!(!res.1);
let sel = parse_select("select /a/ from b group by c");
let res = has_wildcards(&sel);
assert!(res.0);
assert!(!res.1);
let sel = parse_select("select a from b group by *");
let res = has_wildcards(&sel);
assert!(!res.0);
assert!(res.1);
let sel = parse_select("select a from b group by /a/");
let res = has_wildcards(&sel);
assert!(!res.0);
assert!(res.1);
let sel = parse_select("select * from b group by *");
let res = has_wildcards(&sel);
assert!(res.0);
assert!(res.1);
let sel = parse_select("select /a/ from b group by /b/");
let res = has_wildcards(&sel);
assert!(res.0);
assert!(res.1);
// finds wildcard in nested expressions
let sel = parse_select("select COUNT(*) from b group by *");
let res = has_wildcards(&sel);
assert!(res.0);
assert!(res.1);
// does not traverse subqueries
let sel = parse_select("select a from (select * from c group by *) group by c");
let res = has_wildcards(&sel);
assert!(!res.0);
assert!(!res.1);
}
#[test]
fn test_walk_expr() {
fn walk_expr(s: &str) -> String {
let expr = get_first_field(format!("SELECT {} FROM f", s).as_str()).expr;
let mut calls = Vec::new();
let mut call_no = 0;
super::walk_expr(&expr, &mut |n| {
calls.push(format!("{}: {}", call_no, n));
call_no += 1;
Ok(())
})
.unwrap();
calls.join("\n")
}
insta::assert_display_snapshot!(walk_expr("5 + 6"));
insta::assert_display_snapshot!(walk_expr("count(5, foo + 7)"));
insta::assert_display_snapshot!(walk_expr("count(5, foo + 7) + sum(bar)"));
}
#[test]
fn test_walk_expr_mut() {
fn walk_expr_mut(s: &str) -> String {
let mut expr = get_first_field(format!("SELECT {} FROM f", s).as_str()).expr;
let mut calls = Vec::new();
let mut call_no = 0;
super::walk_expr_mut(&mut expr, &mut |n| {
calls.push(format!("{}: {}", call_no, n));
call_no += 1;
Ok(())
})
.unwrap();
calls.join("\n")
}
insta::assert_display_snapshot!(walk_expr_mut("5 + 6"));
insta::assert_display_snapshot!(walk_expr_mut("count(5, foo + 7)"));
insta::assert_display_snapshot!(walk_expr_mut("count(5, foo + 7) + sum(bar)"));
}
#[test]
fn test_walk_expr_mut_modify() {
let mut expr = get_first_field("SELECT foo + bar + 5 FROM f").expr;
walk_expr_mut(&mut expr, &mut |e| {
match e {
Expr::VarRef { name, .. } => *name = format!("c_{}", name).into(),
Expr::Literal(Literal::Unsigned(v)) => *v *= 10,
_ => {}
}
Ok(())
})
.unwrap();
assert_eq!(format!("{}", expr), "c_foo + c_bar + 50")
}
}

View File

@ -0,0 +1,9 @@
---
source: iox_query/src/plan/influxql/rewriter.rs
expression: "walk_expr(\"count(5, foo + 7)\")"
---
0: 5
1: foo
2: 7
3: foo + 7
4: count(5, foo + 7)

View File

@ -0,0 +1,12 @@
---
source: iox_query/src/plan/influxql/rewriter.rs
expression: "walk_expr(\"count(5, foo + 7) + sum(bar)\")"
---
0: 5
1: foo
2: 7
3: foo + 7
4: count(5, foo + 7)
5: bar
6: sum(bar)
7: count(5, foo + 7) + sum(bar)

View File

@ -0,0 +1,7 @@
---
source: iox_query/src/plan/influxql/rewriter.rs
expression: "walk_expr(\"5 + 6\")"
---
0: 5
1: 6
2: 5 + 6

View File

@ -0,0 +1,9 @@
---
source: iox_query/src/plan/influxql/rewriter.rs
expression: "walk_expr_mut(\"count(5, foo + 7)\")"
---
0: 5
1: foo
2: 7
3: foo + 7
4: count(5, foo + 7)

View File

@ -0,0 +1,12 @@
---
source: iox_query/src/plan/influxql/rewriter.rs
expression: "walk_expr_mut(\"count(5, foo + 7) + sum(bar)\")"
---
0: 5
1: foo
2: 7
3: foo + 7
4: count(5, foo + 7)
5: bar
6: sum(bar)
7: count(5, foo + 7) + sum(bar)

View File

@ -0,0 +1,7 @@
---
source: iox_query/src/plan/influxql/rewriter.rs
expression: "walk_expr_mut(\"5 + 6\")"
---
0: 5
1: 6
2: 5 + 6

View File

@ -0,0 +1,166 @@
//! APIs for testing.
#![cfg(test)]
use crate::test::TestChunk;
use crate::QueryChunkMeta;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use influxdb_influxql_parser::parse_statements;
use influxdb_influxql_parser::select::{Field, SelectStatement};
use influxdb_influxql_parser::statement::Statement;
use itertools::Itertools;
use std::any::Any;
use std::sync::Arc;
struct EmptyTable {
table_schema: SchemaRef,
}
impl EmptyTable {
pub(crate) fn new(table_schema: SchemaRef) -> Self {
Self { table_schema }
}
}
/// Returns the first `Field` of the `SELECT` statement.
pub(crate) fn get_first_field(s: &str) -> Field {
parse_select(s).fields.head().unwrap().clone()
}
/// Returns the InfluxQL [`SelectStatement`] for the specified SQL, `s`.
pub(crate) fn parse_select(s: &str) -> SelectStatement {
let statements = parse_statements(s).unwrap();
match statements.first() {
Some(Statement::Select(sel)) => *sel.clone(),
_ => panic!("expected SELECT statement"),
}
}
#[async_trait]
impl TableProvider for EmptyTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.table_schema)
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
_ctx: &SessionState,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
}
pub(crate) struct MockSchemaProvider {}
impl MockSchemaProvider {
/// Convenience constructor to return a new instance of [`Self`] as a dynamic [`SchemaProvider`].
pub(crate) fn new_schema_provider() -> Arc<dyn SchemaProvider> {
Arc::new(Self {})
}
}
impl SchemaProvider for MockSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
vec![
"cpu".into(),
"disk".into(),
"diskio".into(),
"temp_01".into(),
"temp_03".into(),
"temp_03".into(),
]
.into_iter()
.sorted()
.collect::<Vec<_>>()
}
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let schema = match name {
"cpu" => Some(
TestChunk::new("cpu")
.with_tag_column("host")
.with_tag_column("region")
.with_f64_field_column("usage_user")
.with_f64_field_column("usage_system")
.with_f64_field_column("usage_idle")
.schema(),
),
"disk" => Some(
TestChunk::new("disk")
.with_tag_column("host")
.with_tag_column("region")
.with_i64_field_column("bytes_used")
.with_i64_field_column("bytes_free")
.schema(),
),
"diskio" => Some(
TestChunk::new("diskio")
.with_tag_column("host")
.with_tag_column("region")
.with_tag_column("status")
.with_i64_field_column("bytes_read")
.with_i64_field_column("bytes_written")
.with_f64_field_column("read_utilization")
.with_f64_field_column("write_utilization")
.with_bool_field_column("is_local")
.schema(),
),
// Schemas for testing merged schemas
"temp_01" => Some(
TestChunk::new("temp_01")
.with_tag_column("shared_tag0")
.with_tag_column("shared_tag1")
.with_f64_field_column("shared_field0")
.with_f64_field_column("field_f64")
.with_i64_field_column("field_i64")
.with_string_field_column_with_stats("field_str", None, None)
.schema(),
),
"temp_02" => Some(
TestChunk::new("temp_02")
.with_tag_column("shared_tag0")
.with_tag_column("shared_tag1")
.with_i64_field_column("shared_field0")
.schema(),
),
"temp_03" => Some(
TestChunk::new("temp_03")
.with_tag_column("shared_tag0")
.with_tag_column("shared_tag1")
.with_string_field_column_with_stats("shared_field0", None, None)
.schema(),
),
_ => None,
};
match schema {
Some(s) => Some(Arc::new(EmptyTable::new(Arc::clone(s.inner())))),
None => None,
}
}
fn table_exist(&self, name: &str) -> bool {
self.table_names().contains(&name.to_string())
}
}

View File

@ -0,0 +1,82 @@
use influxdb_influxql_parser::expression::VarRefDataType;
use schema::InfluxFieldType;
#[allow(dead_code)]
/// Maps a [`VarRefDataType`] to an [`InfluxFieldType`], or `None` if no such mapping exists.
pub(crate) fn var_ref_data_type_to_field_type(v: VarRefDataType) -> Option<InfluxFieldType> {
match v {
VarRefDataType::Integer => Some(InfluxFieldType::Integer),
VarRefDataType::Unsigned => Some(InfluxFieldType::UInteger),
VarRefDataType::Float => Some(InfluxFieldType::Float),
VarRefDataType::String => Some(InfluxFieldType::String),
VarRefDataType::Boolean => Some(InfluxFieldType::Boolean),
VarRefDataType::Tag | VarRefDataType::Field => None,
}
}
/// Maps an [`InfluxFieldType`] to a [`VarRefDataType`].
pub(crate) fn field_type_to_var_ref_data_type(v: InfluxFieldType) -> VarRefDataType {
match v {
InfluxFieldType::Integer => VarRefDataType::Integer,
InfluxFieldType::UInteger => VarRefDataType::Unsigned,
InfluxFieldType::Float => VarRefDataType::Float,
InfluxFieldType::String => VarRefDataType::String,
InfluxFieldType::Boolean => VarRefDataType::Boolean,
}
}
#[cfg(test)]
mod test {
use super::*;
use assert_matches::assert_matches;
#[test]
fn test_var_ref_data_type_to_field_type() {
assert_matches!(
var_ref_data_type_to_field_type(VarRefDataType::Float),
Some(InfluxFieldType::Float)
);
assert_matches!(
var_ref_data_type_to_field_type(VarRefDataType::Integer),
Some(InfluxFieldType::Integer)
);
assert_matches!(
var_ref_data_type_to_field_type(VarRefDataType::Unsigned),
Some(InfluxFieldType::UInteger)
);
assert_matches!(
var_ref_data_type_to_field_type(VarRefDataType::String),
Some(InfluxFieldType::String)
);
assert_matches!(
var_ref_data_type_to_field_type(VarRefDataType::Boolean),
Some(InfluxFieldType::Boolean)
);
assert!(var_ref_data_type_to_field_type(VarRefDataType::Field).is_none());
assert!(var_ref_data_type_to_field_type(VarRefDataType::Tag).is_none());
}
#[test]
fn test_field_type_to_var_ref_data_type() {
assert_matches!(
field_type_to_var_ref_data_type(InfluxFieldType::Float),
VarRefDataType::Float
);
assert_matches!(
field_type_to_var_ref_data_type(InfluxFieldType::Integer),
VarRefDataType::Integer
);
assert_matches!(
field_type_to_var_ref_data_type(InfluxFieldType::UInteger),
VarRefDataType::Unsigned
);
assert_matches!(
field_type_to_var_ref_data_type(InfluxFieldType::String),
VarRefDataType::String
);
assert_matches!(
field_type_to_var_ref_data_type(InfluxFieldType::Boolean),
VarRefDataType::Boolean
);
}
}