refactor: decouple influxql from SchemaProvider (#6507)

* refactor: decouple influxql from SchemaProvider

* refactor: reorder arguments

* refactor: use QueryNamespaceMeta

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2023-01-06 07:36:29 +00:00 committed by GitHub
parent 23807df7a9
commit 2037db7f7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 185 additions and 237 deletions

View File

@ -1,31 +1,32 @@
use crate::plan::influxql::field::field_by_name;
use crate::plan::influxql::field_mapper::FieldMapper;
use crate::plan::influxql::field_mapper::map_type;
use datafusion::common::{DataFusionError, Result};
use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName};
use influxdb_influxql_parser::expression::{Expr, UnaryOperator, VarRefDataType};
use influxdb_influxql_parser::literal::Literal;
use influxdb_influxql_parser::select::{Dimension, FromMeasurementClause, MeasurementSelection};
use itertools::Itertools;
use predicate::rpc_predicate::QueryNamespaceMeta;
/// Evaluate the type of the specified expression.
///
/// Derived from [Go implementation](https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4796-L4797).
pub(crate) fn evaluate_type(
namespace: &dyn QueryNamespaceMeta,
expr: &Expr,
from: &FromMeasurementClause,
fm: &dyn FieldMapper,
) -> Result<Option<VarRefDataType>> {
TypeEvaluator::new(from, fm).eval_type(expr)
TypeEvaluator::new(from, namespace).eval_type(expr)
}
struct TypeEvaluator<'a> {
fm: &'a dyn FieldMapper,
namespace: &'a dyn QueryNamespaceMeta,
from: &'a FromMeasurementClause,
}
impl<'a> TypeEvaluator<'a> {
fn new(from: &'a FromMeasurementClause, fm: &'a dyn FieldMapper) -> Self {
Self { from, fm }
fn new(from: &'a FromMeasurementClause, namespace: &'a dyn QueryNamespaceMeta) -> Self {
Self { from, namespace }
}
fn eval_type(&self, expr: &Expr) -> Result<Option<VarRefDataType>> {
@ -82,7 +83,7 @@ impl<'a> TypeEvaluator<'a> {
MeasurementSelection::Name(QualifiedMeasurementName {
name: MeasurementName::Name(ident),
..
}) => match (data_type, self.fm.map_type(ident.as_str(), name)?) {
}) => match (data_type, map_type(self.namespace, ident.as_str(), name)?) {
(Some(existing), Some(res)) => {
if res < existing {
data_type = Some(res)
@ -96,7 +97,7 @@ impl<'a> TypeEvaluator<'a> {
if let Some(field) = field_by_name(select, name) {
match (
data_type,
evaluate_type(&field.expr, &select.from, self.fm)?,
evaluate_type(self.namespace, &field.expr, &select.from)?,
) {
(Some(existing), Some(res)) => {
if res < existing {
@ -154,123 +155,159 @@ impl<'a> TypeEvaluator<'a> {
#[cfg(test)]
mod test {
use crate::plan::influxql::expr_type_evaluator::evaluate_type;
use crate::plan::influxql::field_mapper::{FieldMapper, SchemaFieldMapper};
use crate::plan::influxql::test_utils::{parse_select, MockSchemaProvider};
use crate::plan::influxql::test_utils::{parse_select, MockNamespace};
use assert_matches::assert_matches;
use influxdb_influxql_parser::expression::VarRefDataType;
#[test]
fn test_evaluate_type() {
let fm =
&SchemaFieldMapper::new(MockSchemaProvider::new_schema_provider()) as &dyn FieldMapper;
let namespace = MockNamespace::default();
let stmt = parse_select("SELECT shared_field0 FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Float);
let stmt = parse_select("SELECT shared_tag0 FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Tag);
// Unknown
let stmt = parse_select("SELECT not_exists FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from).unwrap();
assert!(res.is_none());
let stmt = parse_select("SELECT shared_field0 FROM temp_02");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Integer);
let stmt = parse_select("SELECT shared_field0 FROM temp_02");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Integer);
// Same field across multiple measurements resolves to the highest precedence (float)
let stmt = parse_select("SELECT shared_field0 FROM temp_01, temp_02");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Float);
// Explicit cast of integer field to float
let stmt = parse_select("SELECT SUM(field_i64::float) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Float);
// data types for functions
let stmt = parse_select("SELECT SUM(field_f64) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Float);
let stmt = parse_select("SELECT SUM(field_i64) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Integer);
let stmt = parse_select("SELECT MIN(field_f64) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Float);
let stmt = parse_select("SELECT MAX(field_i64) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Integer);
let stmt = parse_select("SELECT FIRST(field_str) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::String);
let stmt = parse_select("SELECT LAST(field_str) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::String);
let stmt = parse_select("SELECT MEAN(field_i64) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Float);
let stmt = parse_select("SELECT COUNT(field_f64) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Integer);
let stmt = parse_select("SELECT COUNT(field_i64) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Integer);
let stmt = parse_select("SELECT COUNT(field_str) FROM temp_01");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Integer);
// subqueries
let stmt = parse_select("SELECT inner FROM (SELECT field_f64 as inner FROM temp_01)");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Float);
let stmt =
parse_select("SELECT inner FROM (SELECT shared_tag0, field_f64 as inner FROM temp_01)");
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Float);
let stmt = parse_select(
"SELECT shared_tag0, inner FROM (SELECT shared_tag0, field_f64 as inner FROM temp_01)",
);
let field = stmt.fields.head().unwrap();
let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap();
let res = evaluate_type(&namespace, &field.expr, &stmt.from)
.unwrap()
.unwrap();
assert_matches!(res, VarRefDataType::Tag);
}
}

View File

@ -1,96 +1,65 @@
#![allow(dead_code)]
use crate::plan::influxql::var_ref::field_type_to_var_ref_data_type;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::common::{DataFusionError, Result};
use datafusion::common::Result;
use influxdb_influxql_parser::expression::VarRefDataType;
use schema::{InfluxColumnType, Schema};
use predicate::rpc_predicate::QueryNamespaceMeta;
use schema::InfluxColumnType;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
pub(crate) type FieldTypeMap = HashMap<String, VarRefDataType>;
pub(crate) type TagSet = HashSet<String>;
/// Represents an InfluxQL schema for determining the fields and tags
/// of a measurement.
pub(crate) trait FieldMapper {
/// Determine the fields and tags for the given measurement name.
fn field_and_dimensions(&self, name: &str) -> Result<Option<(FieldTypeMap, TagSet)>>;
/// Determine the [`VarRefDataType`] for the given field name.
fn map_type(&self, name: &str, field: &str) -> Result<Option<VarRefDataType>>;
}
pub(crate) struct SchemaFieldMapper {
schema: Arc<dyn SchemaProvider>,
}
impl SchemaFieldMapper {
pub(crate) fn new(schema: Arc<dyn SchemaProvider>) -> Self {
Self { schema }
}
fn get_schema(&self, name: &str) -> Result<Option<Schema>> {
Ok(Some(
Schema::try_from(match self.schema.table(name) {
Some(t) => t.schema(),
None => return Ok(None),
})
.map_err(|e| {
DataFusionError::Internal(format!("Unable to create IOx schema: {}", e))
})?,
))
pub(crate) fn field_and_dimensions(
namespace: &dyn QueryNamespaceMeta,
name: &str,
) -> Result<Option<(FieldTypeMap, TagSet)>> {
match namespace.table_schema(name) {
Some(iox) => Ok(Some((
FieldTypeMap::from_iter(iox.iter().filter_map(|(col_type, f)| match col_type {
InfluxColumnType::Field(ft) => {
Some((f.name().clone(), field_type_to_var_ref_data_type(ft)))
}
_ => None,
})),
iox.tags_iter()
.map(|f| f.name().clone())
.collect::<TagSet>(),
))),
None => Ok(None),
}
}
impl FieldMapper for SchemaFieldMapper {
fn field_and_dimensions(&self, name: &str) -> Result<Option<(FieldTypeMap, TagSet)>> {
match self.get_schema(name)? {
Some(iox) => Ok(Some((
FieldTypeMap::from_iter(iox.iter().filter_map(|(col_type, f)| match col_type {
InfluxColumnType::Field(ft) => {
Some((f.name().clone(), field_type_to_var_ref_data_type(ft)))
}
_ => None,
})),
iox.tags_iter()
.map(|f| f.name().clone())
.collect::<TagSet>(),
))),
None => Ok(None),
}
}
fn map_type(&self, measurement_name: &str, field: &str) -> Result<Option<VarRefDataType>> {
match self.get_schema(measurement_name)? {
Some(iox) => Ok(match iox.find_index_of(field) {
Some(i) => match iox.field(i).0 {
InfluxColumnType::Field(ft) => Some(field_type_to_var_ref_data_type(ft)),
InfluxColumnType::Tag => Some(VarRefDataType::Tag),
InfluxColumnType::Timestamp => None,
},
None => None,
}),
None => Ok(None),
}
pub(crate) fn map_type(
namespace: &dyn QueryNamespaceMeta,
measurement_name: &str,
field: &str,
) -> Result<Option<VarRefDataType>> {
match namespace.table_schema(measurement_name) {
Some(iox) => Ok(match iox.find_index_of(field) {
Some(i) => match iox.field(i).0 {
InfluxColumnType::Field(ft) => Some(field_type_to_var_ref_data_type(ft)),
InfluxColumnType::Tag => Some(VarRefDataType::Tag),
InfluxColumnType::Timestamp => None,
},
None => None,
}),
None => Ok(None),
}
}
#[cfg(test)]
mod test {
use crate::plan::influxql::field_mapper::{
FieldMapper, FieldTypeMap, SchemaFieldMapper, TagSet,
};
use crate::plan::influxql::test_utils::MockSchemaProvider;
use super::*;
use crate::plan::influxql::test_utils::MockNamespace;
use assert_matches::assert_matches;
use influxdb_influxql_parser::expression::VarRefDataType;
#[test]
fn test_schema_field_mapper() {
let fm =
&SchemaFieldMapper::new(MockSchemaProvider::new_schema_provider()) as &dyn FieldMapper;
let namespace = MockNamespace::default();
// Measurement exists
let (field_set, tag_set) = fm.field_and_dimensions("cpu").unwrap().unwrap();
let (field_set, tag_set) = field_and_dimensions(&namespace, "cpu").unwrap().unwrap();
assert_eq!(
field_set,
FieldTypeMap::from([
@ -105,22 +74,26 @@ mod test {
);
// Measurement does not exist
assert!(fm.field_and_dimensions("cpu2").unwrap().is_none());
assert!(field_and_dimensions(&namespace, "cpu2").unwrap().is_none());
// `map_type` API calls
// Returns expected type
assert_matches!(
fm.map_type("cpu", "usage_user").unwrap(),
map_type(&namespace, "cpu", "usage_user").unwrap(),
Some(VarRefDataType::Float)
);
assert_matches!(
fm.map_type("cpu", "host").unwrap(),
map_type(&namespace, "cpu", "host").unwrap(),
Some(VarRefDataType::Tag)
);
// Returns None for nonexistent field
assert!(fm.map_type("cpu", "nonexistent").unwrap().is_none());
assert!(map_type(&namespace, "cpu", "nonexistent")
.unwrap()
.is_none());
// Returns None for nonexistent measurement
assert!(fm.map_type("nonexistent", "usage").unwrap().is_none());
assert!(map_type(&namespace, "nonexistent", "usage")
.unwrap()
.is_none());
}
}

View File

@ -2,8 +2,7 @@
use crate::plan::influxql::expr_type_evaluator::evaluate_type;
use crate::plan::influxql::field::field_name;
use crate::plan::influxql::field_mapper::{FieldMapper, FieldTypeMap, SchemaFieldMapper, TagSet};
use datafusion::catalog::schema::SchemaProvider;
use crate::plan::influxql::field_mapper::{field_and_dimensions, FieldTypeMap, TagSet};
use datafusion::common::{DataFusionError, Result};
use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName};
use influxdb_influxql_parser::expression::{Expr, VarRefDataType, WildcardType};
@ -16,11 +15,11 @@ use influxdb_influxql_parser::select::{
use influxdb_influxql_parser::string::Regex;
use influxdb_influxql_parser::visit::{Recursion, Visitable, Visitor, VisitorResult};
use itertools::Itertools;
use predicate::rpc_predicate::QueryNamespaceMeta;
use query_functions::clean_non_meta_escapes;
use std::borrow::Borrow;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::sync::Arc;
fn parse_regex(re: &Regex) -> Result<regex::Regex> {
let pattern = clean_non_meta_escapes(re.as_str());
@ -30,9 +29,8 @@ fn parse_regex(re: &Regex) -> Result<regex::Regex> {
}
/// Recursively expand the `from` clause of `stmt` and any subqueries.
fn rewrite_from(stmt: &mut SelectStatement, schema: Arc<dyn SchemaProvider>) -> Result<()> {
fn rewrite_from(namespace: &dyn QueryNamespaceMeta, stmt: &mut SelectStatement) -> Result<()> {
let mut new_from = Vec::new();
let schema = &Arc::clone(&schema);
for ms in stmt.from.iter() {
match ms {
MeasurementSelection::Name(qmn) => match qmn {
@ -40,7 +38,7 @@ fn rewrite_from(stmt: &mut SelectStatement, schema: Arc<dyn SchemaProvider>) ->
name: MeasurementName::Name(name),
..
} => {
if schema.table(name).is_some() {
if namespace.table_schema(name).is_some() {
new_from.push(ms.clone())
}
}
@ -49,7 +47,7 @@ fn rewrite_from(stmt: &mut SelectStatement, schema: Arc<dyn SchemaProvider>) ->
..
} => {
let re = parse_regex(re)?;
schema
namespace
.table_names()
.into_iter()
.filter(|table| re.is_match(table.as_str()))
@ -64,7 +62,7 @@ fn rewrite_from(stmt: &mut SelectStatement, schema: Arc<dyn SchemaProvider>) ->
},
MeasurementSelection::Subquery(q) => {
let mut q = *q.clone();
rewrite_from(&mut q, Arc::clone(schema))?;
rewrite_from(namespace, &mut q)?;
new_from.push(MeasurementSelection::Subquery(Box::new(q)))
}
}
@ -75,12 +73,11 @@ fn rewrite_from(stmt: &mut SelectStatement, schema: Arc<dyn SchemaProvider>) ->
/// Determine the merged fields and tags of the `FROM` clause.
fn from_field_and_dimensions(
namespace: &dyn QueryNamespaceMeta,
from: &FromMeasurementClause,
schema: Arc<dyn SchemaProvider>,
) -> Result<(FieldTypeMap, TagSet)> {
let mut fs = FieldTypeMap::new();
let mut ts = TagSet::new();
let fm = &SchemaFieldMapper::new(schema) as &dyn FieldMapper;
for ms in from.deref() {
match ms {
@ -88,7 +85,7 @@ fn from_field_and_dimensions(
name: MeasurementName::Name(name),
..
}) => {
let (field_set, tag_set) = match fm.field_and_dimensions(name.as_str())? {
let (field_set, tag_set) = match field_and_dimensions(namespace, name.as_str())? {
Some(res) => res,
None => continue,
};
@ -111,7 +108,7 @@ fn from_field_and_dimensions(
}
MeasurementSelection::Subquery(select) => {
for f in select.fields.iter() {
let dt = match evaluate_type(&f.expr, &select.from, fm)? {
let dt = match evaluate_type(namespace, &f.expr, &select.from)? {
Some(dt) => dt,
None => continue,
};
@ -238,21 +235,23 @@ pub(crate) fn walk_expr(expr: &Expr, visit: &mut impl FnMut(&Expr) -> Result<()>
/// underlying schema.
///
/// Derived from [Go implementation](https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L1185).
fn rewrite_field_list(stmt: &mut SelectStatement, schema: Arc<dyn SchemaProvider>) -> Result<()> {
fn rewrite_field_list(
namespace: &dyn QueryNamespaceMeta,
stmt: &mut SelectStatement,
) -> Result<()> {
// Iterate through the `FROM` clause and rewrite any subqueries first.
for ms in stmt.from.iter_mut() {
if let MeasurementSelection::Subquery(subquery) = ms {
rewrite_field_list(subquery, Arc::clone(&schema))?;
rewrite_field_list(namespace, subquery)?;
}
}
// Attempt to rewrite all variable references in the fields with their types, if one
// hasn't been specified.
let fm = &SchemaFieldMapper::new(Arc::clone(&schema)) as &dyn FieldMapper;
stmt.fields.iter_mut().try_for_each(|f| {
walk_expr_mut(&mut f.expr, &mut |e| {
if matches!(e, Expr::VarRef { .. }) {
let new_type = evaluate_type(e.borrow(), &stmt.from, fm)?;
let new_type = evaluate_type(namespace, e.borrow(), &stmt.from)?;
if let Expr::VarRef { data_type, .. } = e {
*data_type = new_type;
@ -267,7 +266,7 @@ fn rewrite_field_list(stmt: &mut SelectStatement, schema: Arc<dyn SchemaProvider
return Ok(());
}
let (field_set, mut tag_set) = from_field_and_dimensions(&stmt.from, Arc::clone(&schema))?;
let (field_set, mut tag_set) = from_field_and_dimensions(namespace, &stmt.from)?;
if !has_group_by_wildcard {
if let Some(group_by) = &stmt.group_by {
@ -396,7 +395,7 @@ fn rewrite_field_list(stmt: &mut SelectStatement, schema: Arc<dyn SchemaProvider
Some(Expr::Wildcard(Some(WildcardType::Tag))) => {
return Err(DataFusionError::External(
format!("unable to use tag as wildcard in {}()", name).into(),
))
));
}
Some(Expr::Wildcard(_)) => {
fields
@ -527,12 +526,12 @@ fn rewrite_field_list_aliases(field_list: &mut FieldList) -> Result<()> {
/// Recursively rewrite the specified [`SelectStatement`], expanding any wildcards or regular expressions
/// found in the projection list, `FROM` clause or `GROUP BY` clause.
pub(crate) fn rewrite_statement(
namespace: &dyn QueryNamespaceMeta,
q: &SelectStatement,
schema: Arc<dyn SchemaProvider>,
) -> Result<SelectStatement> {
let mut stmt = q.clone();
rewrite_from(&mut stmt, Arc::clone(&schema))?;
rewrite_field_list(&mut stmt, schema)?;
rewrite_from(namespace, &mut stmt)?;
rewrite_field_list(namespace, &mut stmt)?;
rewrite_field_list_aliases(&mut stmt.fields)?;
Ok(stmt)
@ -541,7 +540,7 @@ pub(crate) fn rewrite_statement(
#[cfg(test)]
mod test {
use crate::plan::influxql::rewriter::{has_wildcards, rewrite_statement, walk_expr_mut};
use crate::plan::influxql::test_utils::{get_first_field, MockSchemaProvider};
use crate::plan::influxql::test_utils::{get_first_field, MockNamespace};
use influxdb_influxql_parser::expression::Expr;
use influxdb_influxql_parser::literal::Literal;
use influxdb_influxql_parser::parse_statements;
@ -559,9 +558,10 @@ mod test {
#[test]
fn test_rewrite_statement() {
let namespace = MockNamespace::default();
// Exact, match
let stmt = parse_select("SELECT usage_user FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_user::float AS usage_user FROM cpu"
@ -569,7 +569,7 @@ mod test {
// Duplicate columns do not have conflicting aliases
let stmt = parse_select("SELECT usage_user, usage_user FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_user::float AS usage_user, usage_user::float AS usage_user_1 FROM cpu"
@ -577,7 +577,7 @@ mod test {
// Multiple aliases with no conflicts
let stmt = parse_select("SELECT usage_user as usage_user_1, usage_user FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_user::float AS usage_user_1, usage_user::float AS usage_user FROM cpu"
@ -586,14 +586,14 @@ mod test {
// Multiple aliases with conflicts
let stmt =
parse_select("SELECT usage_user as usage_user_1, usage_user, usage_user, usage_user as usage_user_2, usage_user, usage_user_2 FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(stmt.to_string(), "SELECT usage_user::float AS usage_user_1, usage_user::float AS usage_user, usage_user::float AS usage_user_3, usage_user::float AS usage_user_2, usage_user::float AS usage_user_4, usage_user_2 AS usage_user_2_1 FROM cpu");
// Rewriting FROM clause
// Regex, match
let stmt = parse_select("SELECT bytes_free FROM /d/");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer AS bytes_free FROM disk, diskio"
@ -601,26 +601,26 @@ mod test {
// Exact, no match
let stmt = parse_select("SELECT usage_idle FROM foo");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert!(stmt.from.is_empty());
// Regex, no match
let stmt = parse_select("SELECT bytes_free FROM /^d$/");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert!(stmt.from.is_empty());
// Rewriting projection list
// Single wildcard, single measurement
let stmt = parse_select("SELECT * FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu"
);
let stmt = parse_select("SELECT * FROM cpu, disk");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk"
@ -628,7 +628,7 @@ mod test {
// Regular expression selects fields from multiple measurements
let stmt = parse_select("SELECT /usage|bytes/ FROM cpu, disk");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk"
@ -636,7 +636,7 @@ mod test {
// Selective wildcard for tags
let stmt = parse_select("SELECT *::tag FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT host::tag AS host, region::tag AS region FROM cpu"
@ -644,7 +644,7 @@ mod test {
// Selective wildcard for fields
let stmt = parse_select("SELECT *::field FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu"
@ -652,7 +652,7 @@ mod test {
// Mixed fields and wildcards
let stmt = parse_select("SELECT usage_idle, *::tag FROM cpu");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float AS usage_idle, host::tag AS host, region::tag AS region FROM cpu"
@ -661,14 +661,14 @@ mod test {
// GROUP BY expansion
let stmt = parse_select("SELECT usage_idle FROM cpu GROUP BY host");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float AS usage_idle FROM cpu GROUP BY host"
);
let stmt = parse_select("SELECT usage_idle FROM cpu GROUP BY *");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float AS usage_idle FROM cpu GROUP BY host, region"
@ -678,14 +678,14 @@ mod test {
// Invalid regex
let stmt = parse_select("SELECT usage_idle FROM /(not/");
let err = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap_err();
let err = rewrite_statement(&namespace, &stmt).unwrap_err();
assert_contains!(err.to_string(), "invalid regular expression");
// Subqueries
// Subquery, exact, match
let stmt = parse_select("SELECT usage_idle FROM (SELECT usage_idle FROM cpu)");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle::float AS usage_idle FROM (SELECT usage_idle::float FROM cpu)"
@ -693,7 +693,7 @@ mod test {
// Subquery, regex, match
let stmt = parse_select("SELECT bytes_free FROM (SELECT bytes_free FROM /d/)");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer AS bytes_free FROM (SELECT bytes_free::integer FROM disk, diskio)"
@ -701,7 +701,7 @@ mod test {
// Subquery, exact, no match
let stmt = parse_select("SELECT usage_idle FROM (SELECT usage_idle FROM foo)");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT usage_idle AS usage_idle FROM (SELECT usage_idle )"
@ -709,7 +709,7 @@ mod test {
// Subquery, regex, no match
let stmt = parse_select("SELECT bytes_free FROM (SELECT bytes_free FROM /^d$/)");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free AS bytes_free FROM (SELECT bytes_free )"
@ -717,7 +717,7 @@ mod test {
// Binary expression
let stmt = parse_select("SELECT bytes_free+bytes_used FROM disk");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT bytes_free::integer + bytes_used::integer AS bytes_free_bytes_used FROM disk"
@ -725,7 +725,7 @@ mod test {
// Unary expressions
let stmt = parse_select("SELECT -bytes_free FROM disk");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT -bytes_free::integer AS bytes_free FROM disk"
@ -734,7 +734,7 @@ mod test {
// Call expressions
let stmt = parse_select("SELECT COUNT(field_i64) FROM temp_01");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT COUNT(field_i64::integer) AS COUNT FROM temp_01"
@ -742,14 +742,14 @@ mod test {
// Duplicate aggregate columns
let stmt = parse_select("SELECT COUNT(field_i64), COUNT(field_i64) FROM temp_01");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT COUNT(field_i64::integer) AS COUNT, COUNT(field_i64::integer) AS COUNT_1 FROM temp_01"
);
let stmt = parse_select("SELECT COUNT(field_f64) FROM temp_01");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT COUNT(field_f64::float) AS COUNT FROM temp_01"
@ -757,7 +757,7 @@ mod test {
// Expands all fields
let stmt = parse_select("SELECT COUNT(*) FROM temp_01");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT COUNT(field_f64::float) AS COUNT_field_f64, COUNT(field_i64::integer) AS COUNT_field_i64, COUNT(field_str::string) AS COUNT_field_str, COUNT(shared_field0::float) AS COUNT_shared_field0 FROM temp_01"
@ -765,7 +765,7 @@ mod test {
// Expands matching fields
let stmt = parse_select("SELECT COUNT(/64$/) FROM temp_01");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT COUNT(field_f64::float) AS COUNT_field_f64, COUNT(field_i64::integer) AS COUNT_field_i64 FROM temp_01"
@ -773,7 +773,7 @@ mod test {
// Expands only numeric fields
let stmt = parse_select("SELECT SUM(*) FROM temp_01");
let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT SUM(field_f64::float) AS SUM_field_f64, SUM(field_i64::integer) AS SUM_field_i64, SUM(shared_field0::float) AS SUM_shared_field0 FROM temp_01"
@ -782,14 +782,14 @@ mod test {
// Fallible cases
let stmt = parse_select("SELECT *::field + *::tag FROM cpu");
let err = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap_err();
let err = rewrite_statement(&namespace, &stmt).unwrap_err();
assert_eq!(
err.to_string(),
"External error: unsupported expression: contains a wildcard or regular expression"
);
let stmt = parse_select("SELECT COUNT(*::tag) FROM cpu");
let err = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap_err();
let err = rewrite_statement(&namespace, &stmt).unwrap_err();
assert_eq!(
err.to_string(),
"External error: unable to use tag as wildcard in COUNT()"

View File

@ -3,31 +3,13 @@
use crate::test::TestChunk;
use crate::QueryChunkMeta;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use influxdb_influxql_parser::parse_statements;
use influxdb_influxql_parser::select::{Field, SelectStatement};
use influxdb_influxql_parser::statement::Statement;
use itertools::Itertools;
use std::any::Any;
use predicate::rpc_predicate::QueryNamespaceMeta;
use schema::Schema;
use std::sync::Arc;
struct EmptyTable {
table_schema: SchemaRef,
}
impl EmptyTable {
pub(crate) fn new(table_schema: SchemaRef) -> Self {
Self { table_schema }
}
}
/// Returns the first `Field` of the `SELECT` statement.
pub(crate) fn get_first_field(s: &str) -> Field {
parse_select(s).fields.head().unwrap().clone()
@ -42,42 +24,13 @@ pub(crate) fn parse_select(s: &str) -> SelectStatement {
}
}
#[async_trait]
impl TableProvider for EmptyTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.table_schema)
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
_ctx: &SessionState,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
pub(crate) struct MockNamespace {
chunks: Vec<TestChunk>,
}
pub(crate) struct MockSchemaProvider {}
impl MockSchemaProvider {
/// Convenience constructor to return a new instance of [`Self`] as a dynamic [`SchemaProvider`].
pub(crate) fn new_schema_provider() -> Arc<dyn SchemaProvider> {
Arc::new(Self {})
}
/// Return the chunks that make up the test database.
pub(crate) fn table_chunks() -> Vec<TestChunk> {
vec![
impl Default for MockNamespace {
fn default() -> Self {
let chunks = vec![
TestChunk::new("cpu")
.with_quiet()
.with_tag_column("host")
@ -120,36 +73,21 @@ impl MockSchemaProvider {
.with_tag_column("shared_tag0")
.with_tag_column("shared_tag1")
.with_string_field_column_with_stats("shared_field0", None, None),
]
];
Self { chunks }
}
}
impl SchemaProvider for MockSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
impl QueryNamespaceMeta for MockNamespace {
fn table_names(&self) -> Vec<String> {
Self::table_chunks()
self.chunks
.iter()
.map(|c| c.table_name().into())
.sorted()
.collect::<Vec<_>>()
.map(|x| x.table_name().to_string())
.collect()
}
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let schema = Self::table_chunks()
.iter()
.find(|c| c.table_name() == name)
.map(|c| c.schema());
match schema {
Some(s) => Some(Arc::new(EmptyTable::new(Arc::clone(s.inner())))),
None => None,
}
}
fn table_exist(&self, name: &str) -> bool {
self.table_names().contains(&name.to_string())
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
let c = self.chunks.iter().find(|x| x.table_name() == table_name)?;
Some(c.schema())
}
}