fix: support InfluxRPC OR-chains w/ arbitrary child nodes (#6343)
* fix: support InfluxRPC OR-chains w/ arbitrary child nodes Also convert another assertion regarding child nodes of Eq-nodes into a proper error. See https://github.com/influxdata/idpe/issues/16582 . * test: more testspull/24376/head
parent
abe60ee0dc
commit
7e43fe57fa
|
@ -318,8 +318,9 @@ fn convert_simple_node(
|
|||
}
|
||||
|
||||
// If no special case applies, fall back to generic conversion
|
||||
let expr = convert_node_to_expr(node)?;
|
||||
builder.inner = builder.inner.with_expr(expr);
|
||||
if let Some(expr) = convert_node_to_expr(node)? {
|
||||
builder.inner = builder.inner.with_expr(expr);
|
||||
}
|
||||
|
||||
Ok(builder)
|
||||
}
|
||||
|
@ -351,11 +352,11 @@ struct InList {
|
|||
}
|
||||
|
||||
impl TryFrom<&RPCNode> for InList {
|
||||
type Error = &'static str;
|
||||
type Error = String;
|
||||
|
||||
/// If node represents an OR tree like (expr = option1) OR (expr=option2)...
|
||||
/// extracts an InList like expr IN (option1, option2)
|
||||
fn try_from(node: &RPCNode) -> Result<Self, &'static str> {
|
||||
fn try_from(node: &RPCNode) -> Result<Self, String> {
|
||||
InListBuilder::default().append(node)?.build()
|
||||
}
|
||||
}
|
||||
|
@ -381,30 +382,34 @@ impl InListBuilder {
|
|||
///
|
||||
/// For example, if we are at self OR (foo = 'bar') and self.lhs
|
||||
/// is foo, will add 'bar' to value_list
|
||||
fn append(self, node: &RPCNode) -> Result<Self, &'static str> {
|
||||
fn append(self, node: &RPCNode) -> Result<Self, String> {
|
||||
// lhs = rhs
|
||||
if Some(RPCValue::Comparison(RPCComparison::Equal as i32)) == node.value {
|
||||
assert_eq!(node.children.len(), 2);
|
||||
if node.children.len() != 2 {
|
||||
return Err(format!(
|
||||
"Eq nodes should have 2 children but found {}",
|
||||
node.children.len()
|
||||
));
|
||||
}
|
||||
let lhs = &node.children[0];
|
||||
let rhs = &node.children[1];
|
||||
self.append_equal(lhs, rhs)
|
||||
}
|
||||
// lhs OR rhs
|
||||
else if Some(RPCValue::Logical(RPCLogical::Or as i32)) == node.value {
|
||||
assert_eq!(node.children.len(), 2);
|
||||
|
||||
let lhs = &node.children[0];
|
||||
let rhs = &node.children[1];
|
||||
|
||||
// recurse down both sides
|
||||
self.append(lhs).and_then(|s| s.append(rhs))
|
||||
node.children
|
||||
.iter()
|
||||
.fold(Ok(self), |res, node| res.and_then(|this| this.append(node)))
|
||||
} else {
|
||||
Err("Found something other than equal or OR")
|
||||
Err(format!(
|
||||
"Found something other than equal or OR: {:?}",
|
||||
node.value
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
// append lhs = rhs expression, if possible, return None if not
|
||||
fn append_equal(mut self, lhs: &RPCNode, rhs: &RPCNode) -> Result<Self, &'static str> {
|
||||
fn append_equal(mut self, lhs: &RPCNode, rhs: &RPCNode) -> Result<Self, String> {
|
||||
let mut in_list = self
|
||||
.inner
|
||||
.take()
|
||||
|
@ -417,16 +422,17 @@ impl InListBuilder {
|
|||
self.inner = Some(in_list);
|
||||
Ok(self)
|
||||
} else {
|
||||
Err("lhs did not match")
|
||||
Err("lhs did not match".to_owned())
|
||||
}
|
||||
} else {
|
||||
Err("rhs wasn't a string")
|
||||
Err("rhs wasn't a string".to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
// consume self and return the built InList
|
||||
fn build(self) -> Result<InList, &'static str> {
|
||||
self.inner.ok_or("No sub expressions found")
|
||||
fn build(self) -> Result<InList, String> {
|
||||
self.inner
|
||||
.ok_or_else(|| "No sub expressions found".to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -468,7 +474,7 @@ impl TryFrom<Vec<u8>> for DecodedTagKey {
|
|||
// String.
|
||||
|
||||
// converts a Node from the RPC layer into a datafusion logical expr
|
||||
fn convert_node_to_expr(node: RPCNode) -> Result<Expr> {
|
||||
fn convert_node_to_expr(node: RPCNode) -> Result<Option<Expr>> {
|
||||
let RPCNode {
|
||||
children,
|
||||
node_type: _,
|
||||
|
@ -476,7 +482,7 @@ fn convert_node_to_expr(node: RPCNode) -> Result<Expr> {
|
|||
} = node;
|
||||
let inputs = children
|
||||
.into_iter()
|
||||
.map(convert_node_to_expr)
|
||||
.flat_map(|c| convert_node_to_expr(c).transpose())
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let value = value.expect("Normalization removed all None values");
|
||||
|
@ -484,7 +490,7 @@ fn convert_node_to_expr(node: RPCNode) -> Result<Expr> {
|
|||
}
|
||||
|
||||
// Builds an Expr given the Value and the converted children
|
||||
fn build_node(value: RPCValue, inputs: Vec<Expr>) -> Result<Expr> {
|
||||
fn build_node(value: RPCValue, inputs: Vec<Expr>) -> Result<Option<Expr>> {
|
||||
// Only logical / comparison ops can have inputs.
|
||||
let can_have_children = matches!(&value, RPCValue::Logical(_) | RPCValue::Comparison(_));
|
||||
|
||||
|
@ -493,16 +499,16 @@ fn build_node(value: RPCValue, inputs: Vec<Expr>) -> Result<Expr> {
|
|||
}
|
||||
|
||||
match value {
|
||||
RPCValue::StringValue(s) => Ok(lit(s)),
|
||||
RPCValue::BoolValue(b) => Ok(lit(b)),
|
||||
RPCValue::IntValue(v) => Ok(lit(v)),
|
||||
RPCValue::UintValue(v) => Ok(lit(v)),
|
||||
RPCValue::FloatValue(f) => Ok(lit(f)),
|
||||
RPCValue::RegexValue(pattern) => Ok(lit(pattern)),
|
||||
RPCValue::TagRefValue(tag_name) => build_tag_ref(tag_name),
|
||||
RPCValue::FieldRefValue(field_name) => Ok(field_name.as_expr()),
|
||||
RPCValue::StringValue(s) => Ok(Some(lit(s))),
|
||||
RPCValue::BoolValue(b) => Ok(Some(lit(b))),
|
||||
RPCValue::IntValue(v) => Ok(Some(lit(v))),
|
||||
RPCValue::UintValue(v) => Ok(Some(lit(v))),
|
||||
RPCValue::FloatValue(f) => Ok(Some(lit(f))),
|
||||
RPCValue::RegexValue(pattern) => Ok(Some(lit(pattern))),
|
||||
RPCValue::TagRefValue(tag_name) => build_tag_ref(tag_name).map(Some),
|
||||
RPCValue::FieldRefValue(field_name) => Ok(Some(field_name.as_expr())),
|
||||
RPCValue::Logical(logical) => build_logical_node(logical, inputs),
|
||||
RPCValue::Comparison(comparison) => build_comparison_node(comparison, inputs),
|
||||
RPCValue::Comparison(comparison) => build_comparison_node(comparison, inputs).map(Some),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -537,14 +543,22 @@ fn build_tag_ref(tag_name: Vec<u8>) -> Result<Expr> {
|
|||
}
|
||||
|
||||
/// Creates an expr from a "Logical" Node
|
||||
fn build_logical_node(logical: i32, inputs: Vec<Expr>) -> Result<Expr> {
|
||||
fn build_logical_node(logical: i32, inputs: Vec<Expr>) -> Result<Option<Expr>> {
|
||||
let logical_enum = RPCLogical::from_i32(logical);
|
||||
|
||||
match logical_enum {
|
||||
Some(RPCLogical::And) => build_binary_expr(Operator::And, inputs),
|
||||
Some(RPCLogical::Or) => build_binary_expr(Operator::Or, inputs),
|
||||
None => UnknownLogicalNodeSnafu { logical }.fail(),
|
||||
let op = match logical_enum {
|
||||
Some(RPCLogical::And) => Operator::And,
|
||||
Some(RPCLogical::Or) => Operator::Or,
|
||||
None => UnknownLogicalNodeSnafu { logical }.fail()?,
|
||||
};
|
||||
|
||||
if inputs.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(inputs
|
||||
.into_iter()
|
||||
.reduce(|left, right| binary_expr(left, op, right)))
|
||||
}
|
||||
|
||||
/// Creates an expr from a "Comparison" Node
|
||||
|
@ -881,6 +895,7 @@ mod tests {
|
|||
use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate};
|
||||
use schema::{Schema, SchemaBuilder};
|
||||
use std::{collections::BTreeSet, sync::Arc};
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
use super::*;
|
||||
|
||||
|
@ -1232,6 +1247,215 @@ mod tests {
|
|||
assert!(predicate.range.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_convert_predicate_multiple_field_selection_flat_node1() {
|
||||
let selection = make_or_node3(
|
||||
make_field_ref_node("field1"),
|
||||
make_field_ref_node("field2"),
|
||||
make_field_ref_node("field3"),
|
||||
);
|
||||
|
||||
let rpc_predicate = RPCPredicate {
|
||||
root: Some(selection),
|
||||
};
|
||||
|
||||
let predicate = InfluxRpcPredicateBuilder::default()
|
||||
.rpc_predicate(Some(rpc_predicate))
|
||||
.unwrap()
|
||||
.build();
|
||||
|
||||
assert!(predicate.table_names().is_none());
|
||||
|
||||
let predicate = table_predicate(predicate);
|
||||
|
||||
assert!(predicate.exprs.is_empty());
|
||||
assert_eq!(
|
||||
predicate.field_columns,
|
||||
Some(to_set(&["field1", "field2", "field3"]))
|
||||
);
|
||||
assert!(predicate.range.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_convert_predicate_multiple_field_selection_flat_node2() {
|
||||
let (comparison, expected_expr) = make_host_comparison();
|
||||
let selection = make_or_node3(comparison.clone(), comparison.clone(), comparison);
|
||||
|
||||
let rpc_predicate = RPCPredicate {
|
||||
root: Some(selection),
|
||||
};
|
||||
|
||||
let predicate = InfluxRpcPredicateBuilder::default()
|
||||
.rpc_predicate(Some(rpc_predicate))
|
||||
.unwrap()
|
||||
.build();
|
||||
|
||||
assert!(predicate.table_names().is_none());
|
||||
|
||||
let predicate = table_predicate(predicate);
|
||||
|
||||
let expected_expr = expected_expr;
|
||||
|
||||
let converted_expr = &predicate.exprs;
|
||||
|
||||
assert_eq!(
|
||||
&expected_expr, converted_expr,
|
||||
"expected '{:#?}' doesn't match actual '{:#?}'",
|
||||
expected_expr, converted_expr
|
||||
);
|
||||
assert_eq!(predicate.field_columns, None,);
|
||||
assert!(predicate.range.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_convert_predicate_multiple_field_selection_node_without_children() {
|
||||
let selection = RPCNode {
|
||||
node_type: RPCNodeType::LogicalExpression as i32,
|
||||
children: vec![],
|
||||
value: Some(RPCValue::Logical(RPCLogical::Or as i32)),
|
||||
};
|
||||
|
||||
let rpc_predicate = RPCPredicate {
|
||||
root: Some(selection),
|
||||
};
|
||||
|
||||
let predicate = InfluxRpcPredicateBuilder::default()
|
||||
.rpc_predicate(Some(rpc_predicate))
|
||||
.unwrap()
|
||||
.build();
|
||||
|
||||
assert!(predicate.table_names().is_none());
|
||||
|
||||
let predicate = table_predicate(predicate);
|
||||
|
||||
assert!(predicate.exprs.is_empty());
|
||||
assert_eq!(predicate.field_columns, None,);
|
||||
assert!(predicate.range.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_convert_predicate_multiple_field_selection_node_one_child() {
|
||||
let selection = RPCNode {
|
||||
node_type: RPCNodeType::LogicalExpression as i32,
|
||||
children: vec![make_field_ref_node("field1")],
|
||||
value: Some(RPCValue::Logical(RPCLogical::Or as i32)),
|
||||
};
|
||||
|
||||
let rpc_predicate = RPCPredicate {
|
||||
root: Some(selection),
|
||||
};
|
||||
|
||||
let predicate = InfluxRpcPredicateBuilder::default()
|
||||
.rpc_predicate(Some(rpc_predicate))
|
||||
.unwrap()
|
||||
.build();
|
||||
|
||||
assert!(predicate.table_names().is_none());
|
||||
|
||||
let predicate = table_predicate(predicate);
|
||||
|
||||
assert!(predicate.exprs.is_empty());
|
||||
assert_eq!(predicate.field_columns, Some(to_set(&["field1"])));
|
||||
assert!(predicate.range.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_single_and_no_children() {
|
||||
let selection = RPCNode {
|
||||
node_type: RPCNodeType::LogicalExpression as i32,
|
||||
children: vec![],
|
||||
value: Some(RPCValue::Logical(RPCLogical::And as i32)),
|
||||
};
|
||||
|
||||
let rpc_predicate = RPCPredicate {
|
||||
root: Some(selection),
|
||||
};
|
||||
|
||||
let predicate = InfluxRpcPredicateBuilder::default()
|
||||
.rpc_predicate(Some(rpc_predicate))
|
||||
.unwrap()
|
||||
.build();
|
||||
|
||||
assert!(predicate.table_names().is_none());
|
||||
|
||||
let predicate = table_predicate(predicate);
|
||||
|
||||
assert!(predicate.exprs.is_empty());
|
||||
assert_eq!(predicate.field_columns, None,);
|
||||
assert!(predicate.range.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_single_and_one_child() {
|
||||
let (node, expr) = make_host_comparison();
|
||||
|
||||
let selection = RPCNode {
|
||||
node_type: RPCNodeType::LogicalExpression as i32,
|
||||
children: vec![node],
|
||||
value: Some(RPCValue::Logical(RPCLogical::And as i32)),
|
||||
};
|
||||
|
||||
let rpc_predicate = RPCPredicate {
|
||||
root: Some(selection),
|
||||
};
|
||||
|
||||
let predicate = InfluxRpcPredicateBuilder::default()
|
||||
.rpc_predicate(Some(rpc_predicate))
|
||||
.unwrap()
|
||||
.build();
|
||||
|
||||
assert!(predicate.table_names().is_none());
|
||||
|
||||
let predicate = table_predicate(predicate);
|
||||
|
||||
let converted_expr = &predicate.exprs;
|
||||
|
||||
assert_eq!(
|
||||
&expr, converted_expr,
|
||||
"expected '{:#?}' doesn't match actual '{:#?}'",
|
||||
expr, converted_expr
|
||||
);
|
||||
assert_eq!(predicate.field_columns, None,);
|
||||
assert!(predicate.range.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_single_and_three_children() {
|
||||
let (node, expr) = make_host_comparison();
|
||||
|
||||
let selection = RPCNode {
|
||||
node_type: RPCNodeType::LogicalExpression as i32,
|
||||
children: vec![node.clone(), node.clone(), node],
|
||||
value: Some(RPCValue::Logical(RPCLogical::And as i32)),
|
||||
};
|
||||
|
||||
let rpc_predicate = RPCPredicate {
|
||||
root: Some(selection),
|
||||
};
|
||||
|
||||
let predicate = InfluxRpcPredicateBuilder::default()
|
||||
.rpc_predicate(Some(rpc_predicate))
|
||||
.unwrap()
|
||||
.build();
|
||||
|
||||
assert!(predicate.table_names().is_none());
|
||||
|
||||
let predicate = table_predicate(predicate);
|
||||
|
||||
let converted_expr = &predicate.exprs;
|
||||
|
||||
let expected_expr = (0..3)
|
||||
.flat_map(|_| expr.clone().into_iter())
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(
|
||||
&expected_expr, converted_expr,
|
||||
"expected '{:#?}' doesn't match actual '{:#?}'",
|
||||
expected_expr, converted_expr
|
||||
);
|
||||
assert_eq!(predicate.field_columns, None,);
|
||||
assert!(predicate.range.is_none());
|
||||
}
|
||||
|
||||
// test multiple field restrictions and a general predicate
|
||||
#[test]
|
||||
fn test_convert_predicate_multiple_field_selection_and_predicate() {
|
||||
|
@ -1287,6 +1511,34 @@ mod tests {
|
|||
assert!(predicate.range.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_err_eq_not_two_children() {
|
||||
let iconst = RPCNode {
|
||||
node_type: RPCNodeType::Literal as i32,
|
||||
children: vec![],
|
||||
value: Some(RPCValue::StringValue("h".into())),
|
||||
};
|
||||
|
||||
for n_children in [0, 3] {
|
||||
let rpc_predicate = RPCPredicate {
|
||||
root: Some(RPCNode {
|
||||
node_type: RPCNodeType::ComparisonExpression as i32,
|
||||
children: (0..n_children).map(|_| iconst.clone()).collect(),
|
||||
value: Some(RPCValue::Comparison(RPCComparison::Equal as i32)),
|
||||
}),
|
||||
};
|
||||
|
||||
let err = InfluxRpcPredicateBuilder::default()
|
||||
.rpc_predicate(Some(rpc_predicate))
|
||||
.unwrap_err();
|
||||
|
||||
assert_contains!(
|
||||
err.to_string(),
|
||||
"Unsupported number of children in binary operator Eq"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// make a _f = 'field_name' type node
|
||||
fn make_field_ref_node(field_name: impl Into<String>) -> RPCNode {
|
||||
make_tag_ref_node(TAG_KEY_FIELD, field_name)
|
||||
|
@ -1350,6 +1602,15 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
/// make n1 OR n2 OR n3
|
||||
fn make_or_node3(n1: RPCNode, n2: RPCNode, n3: RPCNode) -> RPCNode {
|
||||
RPCNode {
|
||||
node_type: RPCNodeType::LogicalExpression as i32,
|
||||
children: vec![n1, n2, n3],
|
||||
value: Some(RPCValue::Logical(RPCLogical::Or as i32)),
|
||||
}
|
||||
}
|
||||
|
||||
/// make n1 AND n2
|
||||
fn make_and_node(n1: RPCNode, n2: RPCNode) -> RPCNode {
|
||||
RPCNode {
|
||||
|
|
Loading…
Reference in New Issue