fix: add support for rewriting field column

pull/24376/head
Edd Robinson 2021-10-20 11:29:35 +01:00
parent 7d86d1144e
commit a0ba75576e
1 changed files with 117 additions and 0 deletions

View File

@ -10,9 +10,11 @@ use datafusion::{
error::{DataFusionError, Result as DatafusionResult},
logical_plan::{
binary_expr, lit, Column, DFSchemaRef, Expr, ExprRewriter, LogicalPlan, LogicalPlanBuilder,
Operator,
},
optimizer::utils::expr_to_columns,
prelude::col,
scalar::ScalarValue,
};
use datafusion_util::AsExpr;
@ -49,6 +51,13 @@ use crate::{
/// filled in with a literal for the respective name of that field
pub const MEASUREMENT_COLUMN_NAME: &str = "_measurement";
/// Any equality expressions using this column name are removed and replaced
/// with projections on the specified column.
///
/// This is required to support predicates like
/// `_field` = temperature
pub const FIELD_COLUMN_NAME: &str = "_field";
/// Any column references to this name are rewritten to be a disjunctive set of
/// expressions to all field columns for the table schema.
///
@ -1657,13 +1666,27 @@ struct TableNormalizedPredicate {
impl TableNormalizedPredicate {
fn new(table_name: &str, schema: Arc<Schema>, mut inner: Predicate) -> Self {
let mut field_projections = BTreeSet::new();
inner.exprs = inner
.exprs
.into_iter()
.map(|e| rewrite_measurement_references(table_name, e))
.map(|e| rewrite_field_value_references(Arc::clone(&schema), e))
.map(|e| {
// Rewrite any references to `_field = a_field_name` with a literal true
// and keep track of referenced field names to add to the field
// column projection set.
rewrite_field_column_references(&mut field_projections, e)
})
.collect::<Vec<_>>();
if !field_projections.is_empty() {
match &mut inner.field_columns {
Some(field_columns) => field_columns.extend(field_projections.into_iter()),
None => inner.field_columns = Some(field_projections),
};
}
Self {
inner: Arc::new(inner),
}
@ -1747,6 +1770,53 @@ impl ExprRewriter for FieldValueRewriter {
}
}
/// Rewrites a predicate on `_field` as a projection on a specific defined by
/// the literal in the expression.
///
/// For example, the expression `_field = "load4"` is removed from the
/// normalised expression, and a column "load4" added to the predicate
/// projection.
fn rewrite_field_column_references(
field_projections: &'_ mut BTreeSet<String>,
expr: Expr,
) -> Expr {
let mut rewriter = FieldColumnRewriter { field_projections };
expr.rewrite(&mut rewriter).expect("rewrite is infallible")
}
struct FieldColumnRewriter<'a> {
field_projections: &'a mut BTreeSet<String>,
}
impl<'a> ExprRewriter for FieldColumnRewriter<'a> {
fn mutate(&mut self, expr: Expr) -> DatafusionResult<Expr> {
Ok(match expr {
Expr::BinaryExpr {
ref left,
op,
ref right,
} => {
if let Expr::Column(inner) = &**left {
if inner.name != FIELD_COLUMN_NAME || op != Operator::Eq {
// TODO(edd): add support for !=
return Ok(expr);
}
if let Expr::Literal(ScalarValue::Utf8(Some(name))) = &**right {
self.field_projections.insert(name.to_owned());
return Ok(Expr::Literal(ScalarValue::Boolean(Some(true))));
}
expr
} else {
expr
}
}
_ => expr,
})
}
}
/// Returns true if all columns referred to in `expr` are present in
/// the schema, false otherwise
pub fn schema_has_all_expr_columns(schema: &Schema, expr: &Expr) -> bool {
@ -1852,4 +1922,51 @@ mod tests {
.unwrap();
assert_eq!(rewritten, binary_expr(col("f1"), Operator::Gt, lit(1.88)));
}
#[test]
fn test_field_column_rewriter() {
let mut field_columns = BTreeSet::new();
let mut rewriter = FieldColumnRewriter {
field_projections: &mut field_columns,
};
let cases = vec![
(
binary_expr(col("f1"), Operator::Eq, lit(1.82)),
binary_expr(col("f1"), Operator::Eq, lit(1.82)),
vec![],
),
(
// TODO - should be rewritten and project onto all field columns
binary_expr(col(FIELD_COLUMN_NAME), Operator::NotEq, lit("foo")),
binary_expr(col(FIELD_COLUMN_NAME), Operator::NotEq, lit("foo")),
vec![],
),
(
binary_expr(col(FIELD_COLUMN_NAME), Operator::Eq, lit("f1")),
lit(true),
vec!["f1"],
),
(
binary_expr(
binary_expr(col(FIELD_COLUMN_NAME), Operator::Eq, lit("f1")),
Operator::Or,
binary_expr(col(FIELD_COLUMN_NAME), Operator::Eq, lit("f2")),
),
binary_expr(lit(true), Operator::Or, lit(true)),
vec!["f1", "f2"],
),
];
for (input, exp_expr, field_columns) in cases {
let rewritten = input.rewrite(&mut rewriter).unwrap();
assert_eq!(rewritten, exp_expr);
let mut exp_field_columns = field_columns
.into_iter()
.map(String::from)
.collect::<BTreeSet<String>>();
assert_eq!(rewriter.field_projections, &mut exp_field_columns);
}
}
}