From 44eb3b994dc040a0a946728771a373f21044858a Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 15 Sep 2021 16:35:51 +0200 Subject: [PATCH 1/3] feat: `Predicate` serialization Closes #2493. --- Cargo.lock | 1 + generated_types/build.rs | 1 + .../influxdata/iox/catalog/v1/predicate.proto | 81 ++++++ predicate/Cargo.toml | 1 + predicate/src/lib.rs | 1 + predicate/src/serialize.rs | 263 ++++++++++++++++++ 6 files changed, 348 insertions(+) create mode 100644 generated_types/protos/influxdata/iox/catalog/v1/predicate.proto create mode 100644 predicate/src/serialize.rs diff --git a/Cargo.lock b/Cargo.lock index c45ed75024..bd2cbdfec0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3150,6 +3150,7 @@ dependencies = [ "data_types", "datafusion 0.1.0", "datafusion_util", + "generated_types", "internal_types", "observability_deps", "regex", diff --git a/generated_types/build.rs b/generated_types/build.rs index eb399e9ca3..276e289e71 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -39,6 +39,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { idpe_path.join("source.proto"), catalog_path.join("catalog.proto"), catalog_path.join("parquet_metadata.proto"), + catalog_path.join("predicate.proto"), management_path.join("database_rules.proto"), management_path.join("chunk.proto"), management_path.join("partition.proto"), diff --git a/generated_types/protos/influxdata/iox/catalog/v1/predicate.proto b/generated_types/protos/influxdata/iox/catalog/v1/predicate.proto new file mode 100644 index 0000000000..08e2ff4393 --- /dev/null +++ b/generated_types/protos/influxdata/iox/catalog/v1/predicate.proto @@ -0,0 +1,81 @@ +syntax = "proto3"; +package influxdata.iox.catalog.v1; + +// Represents a parsed predicate for evaluation by the InfluxDB IOx query engine. +message Predicate { + // Optional table restriction. If present, restricts the results to only tables these tables. + OptionalStringSet table_names = 1; + + // Optional field restriction. If present, restricts the results to only tables which have *at least one* of the + // fields in field_columns. + OptionalStringSet field_columns = 2; + + // Optional partition key filter + OptionalString partition_key = 3; + + // Optional timestamp range: only rows within this range are included in results. Other rows are excluded. + TimestampRange range = 4; + + // Optional arbitrary predicates, represented as list of expressions applied a logical conjunction (aka they are + // 'AND'ed together). Only rows that evaluate to TRUE for all these expressions should be returned. Other rows are + // excluded from the results. + repeated Expr exprs = 5; +} + +// A optional string set. +// +// This is used instead of a `repeated string` to differenctiate between "empty set" and "none". +message OptionalStringSet { + repeated string values = 1; +} + +// An optional string. +message OptionalString { + string value = 1; +} + +// Specifies a continuous range of nanosecond timestamps. +message TimestampRange { + // Start defines the inclusive lower bound. + int64 start = 1; + + // End defines the exclusive upper bound. + int64 end = 2; +} + +// Single expression to be used as parts of a predicate. +// +// Only very simple expression of the type ` ` are supported. +message Expr { + // Column (w/o table name). + string column = 1; + + // Operator. + Op op = 2; + + // Scalar value. + Scalar scalar = 3; +} + +// Binary operator that can be evaluated on a column and a scalar value. +enum Op { + // Unspecified operator, will result in an error. + OP_UNSPECIFIED = 0; + + // Strict equality (`=`). + OP_EQ = 1; + + // Inequality (`!=`). + OP_NE = 2; +} + +// Scalar value of a certain type. +message Scalar { + oneof value { + bool value_bool = 1; + int64 value_i64 = 2; + uint64 value_u64 = 3; + double value_f64 = 4; + string value_string = 5; + } +} diff --git a/predicate/Cargo.toml b/predicate/Cargo.toml index 768d45ef02..76dfd0975f 100644 --- a/predicate/Cargo.toml +++ b/predicate/Cargo.toml @@ -9,6 +9,7 @@ chrono = "0.4" data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } datafusion_util = { path = "../datafusion_util" } +generated_types = { path = "../generated_types" } internal_types = { path = "../internal_types" } observability_deps = { path = "../observability_deps" } regex = "1" diff --git a/predicate/src/lib.rs b/predicate/src/lib.rs index bf0b17e4d5..0d433d12a1 100644 --- a/predicate/src/lib.rs +++ b/predicate/src/lib.rs @@ -1,2 +1,3 @@ pub mod predicate; pub mod regex; +pub mod serialize; diff --git a/predicate/src/serialize.rs b/predicate/src/serialize.rs new file mode 100644 index 0000000000..037abf41f0 --- /dev/null +++ b/predicate/src/serialize.rs @@ -0,0 +1,263 @@ +//! Code to serialize and deserialize certain expressions. +//! +//! Note that [Ballista] also provides a serialization using [Protocol Buffers 3]. However the protocol is meant as a +//! communication channel between workers and clients of Ballista, not for long term preservation. For IOx we need a +//! more stable solution. Luckily we only need to support a very small subset of expression. +//! +//! [Ballista]: https://github.com/apache/arrow-datafusion/blob/22fcb3d7a68a56afbe12eab9e7d98f7b8de33703/ballista/rust/core/proto/ballista.proto +//! [Protocol Buffers 3]: https://developers.google.com/protocol-buffers/docs/proto3 +use std::{collections::BTreeSet, ops::Deref}; + +use data_types::timestamp::TimestampRange; +use datafusion::{ + logical_plan::{Column, Expr, Operator}, + scalar::ScalarValue, +}; +use generated_types::influxdata::iox::catalog::v1 as proto; +use snafu::{OptionExt, Snafu}; + +use crate::predicate::Predicate; + +#[derive(Debug, Snafu)] +pub enum SerializeError { + #[snafu(display("unsupported expression: {:?}", expr))] + UnsupportedExpression { expr: Expr }, + + #[snafu(display("unsupported operants: left {:?}; right {:?}", left, right))] + UnsupportedOperants { left: Expr, right: Expr }, + + #[snafu(display("unsupported scalar value: {:?}", value))] + UnsupportedScalarValue { value: ScalarValue }, + + #[snafu(display("unsupported operator: {:?}", op))] + UnsupportedOperator { op: Operator }, +} + +/// Serialize IOx [`Predicate`] to a protobuf object. +pub fn serialize(predicate: &Predicate) -> Result { + let proto_predicate = proto::Predicate { + table_names: serialize_optional_string_set(&predicate.table_names), + field_columns: serialize_optional_string_set(&predicate.field_columns), + partition_key: serialize_optional_string(&predicate.partition_key), + range: serialize_timestamp_range(&predicate.range), + exprs: predicate + .exprs + .iter() + .map(serialize_expr) + .collect::, SerializeError>>()?, + }; + Ok(proto_predicate) +} + +fn serialize_optional_string_set( + set: &Option>, +) -> Option { + set.as_ref().map(|set| proto::OptionalStringSet { + values: set.iter().cloned().collect(), + }) +} + +fn serialize_optional_string(s: &Option) -> Option { + s.as_ref() + .map(|s| proto::OptionalString { value: s.clone() }) +} + +fn serialize_timestamp_range(r: &Option) -> Option { + r.as_ref().map(|r| proto::TimestampRange { + start: r.start, + end: r.end, + }) +} + +fn serialize_expr(expr: &Expr) -> Result { + match expr { + Expr::BinaryExpr { left, op, right } => { + let (column, scalar) = match (left.deref(), right.deref()) { + // The delete predicate parser currently only supports ``, not ``, + // however this could can easily be extended to support the latter case as well. + (Expr::Column(column), Expr::Literal(value)) => { + let column = column.name.clone(); + let scalar = serialize_scalar_value(value)?; + + (column, scalar) + } + (other_left, other_right) => { + return Err(SerializeError::UnsupportedOperants { + left: other_left.clone(), + right: other_right.clone(), + }); + } + }; + + let op = serialize_operator(op)?; + + let proto_expr = proto::Expr { + column, + op: op.into(), + scalar: Some(scalar), + }; + Ok(proto_expr) + } + other => Err(SerializeError::UnsupportedExpression { + expr: other.clone(), + }), + } +} + +fn serialize_scalar_value(value: &ScalarValue) -> Result { + // see https://github.com/apache/arrow-datafusion/blob/195b69995db8044ce283d72fb78eb6b74b8842f5/datafusion/src/sql/planner.rs#L274-L295 + match value { + ScalarValue::Utf8(Some(value)) => Ok(proto::Scalar { + value: Some(proto::scalar::Value::ValueString(value.clone())), + }), + ScalarValue::Int64(Some(value)) => Ok(proto::Scalar { + value: Some(proto::scalar::Value::ValueI64(*value)), + }), + ScalarValue::Float64(Some(value)) => Ok(proto::Scalar { + value: Some(proto::scalar::Value::ValueF64(*value)), + }), + ScalarValue::Boolean(Some(value)) => Ok(proto::Scalar { + value: Some(proto::scalar::Value::ValueBool(*value)), + }), + other => Err(SerializeError::UnsupportedScalarValue { + value: other.clone(), + }), + } +} + +fn serialize_operator(op: &Operator) -> Result { + match op { + Operator::Eq => Ok(proto::Op::Eq), + Operator::NotEq => Ok(proto::Op::Ne), + other => Err(SerializeError::UnsupportedOperator { op: *other }), + } +} + +#[derive(Debug, Snafu)] +pub enum DeserializeError { + #[snafu(display("unspecified operator"))] + UnspecifiedOperator, + + #[snafu(display("illegal operator enum value: {}", value))] + IllegalOperatorEnumValue { value: i32 }, + + #[snafu(display("missing scalar value"))] + MissingScalarValue, +} + +/// Deserialize IOx [`Predicate`] from a protobuf object. +pub fn deserialize( + proto_predicate: &proto::Predicate, + table_name: &str, +) -> Result { + let predicate = Predicate { + table_names: deserialize_optional_string_set(&proto_predicate.table_names), + field_columns: deserialize_optional_string_set(&proto_predicate.field_columns), + partition_key: deserialize_optional_string(&proto_predicate.partition_key), + range: deserialize_timestamp_range(&proto_predicate.range), + exprs: proto_predicate + .exprs + .iter() + .map(|expr| deserialize_expr(expr, table_name)) + .collect::, DeserializeError>>()?, + }; + Ok(predicate) +} + +fn deserialize_optional_string_set( + set: &Option, +) -> Option> { + set.as_ref().map(|set| set.values.iter().cloned().collect()) +} + +fn deserialize_optional_string(s: &Option) -> Option { + s.as_ref().map(|s| s.value.clone()) +} + +fn deserialize_timestamp_range(r: &Option) -> Option { + r.as_ref().map(|r| TimestampRange { + start: r.start, + end: r.end, + }) +} + +fn deserialize_expr(proto_expr: &proto::Expr, table_name: &str) -> Result { + let column = Column { + relation: Some(table_name.to_string()), + name: proto_expr.column.clone(), + }; + let op = deserialize_operator(&proto::Op::from_i32(proto_expr.op).context( + IllegalOperatorEnumValue { + value: proto_expr.op, + }, + )?)?; + let value = deserialize_scalar_value(&proto_expr.scalar)?; + + let expr = Expr::BinaryExpr { + left: Box::new(Expr::Column(column)), + op, + right: Box::new(Expr::Literal(value)), + }; + Ok(expr) +} + +fn deserialize_scalar_value( + value: &Option, +) -> Result { + match value + .as_ref() + .context(MissingScalarValue)? + .value + .as_ref() + .context(MissingScalarValue)? + { + proto::scalar::Value::ValueBool(value) => Ok(ScalarValue::Boolean(Some(*value))), + proto::scalar::Value::ValueI64(value) => Ok(ScalarValue::Int64(Some(*value))), + proto::scalar::Value::ValueU64(value) => Ok(ScalarValue::UInt64(Some(*value))), + proto::scalar::Value::ValueF64(value) => Ok(ScalarValue::Float64(Some(*value))), + proto::scalar::Value::ValueString(value) => Ok(ScalarValue::Utf8(Some(value.clone()))), + } +} + +fn deserialize_operator(op: &proto::Op) -> Result { + match op { + proto::Op::Unspecified => Err(DeserializeError::UnspecifiedOperator), + proto::Op::Eq => Ok(Operator::Eq), + proto::Op::Ne => Ok(Operator::NotEq), + } +} + +#[cfg(test)] +mod tests { + use crate::predicate::{ParseDeletePredicate, PredicateBuilder}; + + use super::*; + + #[test] + fn test_roundtrip() { + let table_name = "my_table"; + let predicate = delete_predicate(table_name); + let proto = serialize(&predicate).unwrap(); + let recovered = deserialize(&proto, table_name).unwrap(); + assert_eq!(predicate, recovered); + } + + fn delete_predicate(table_name: &str) -> Predicate { + let start_time = "11"; + let stop_time = "22"; + let predicate = r#"city=Boston and cost!=100 and temp=87.5 and good=true"#; + + let parse_delete_pred = + ParseDeletePredicate::try_new(table_name, start_time, stop_time, predicate).unwrap(); + + let mut del_predicate_builder = PredicateBuilder::new() + .table(table_name) + .timestamp_range(parse_delete_pred.start_time, parse_delete_pred.stop_time); + + for expr in parse_delete_pred.predicate { + del_predicate_builder = del_predicate_builder.add_expr(expr); + } + + del_predicate_builder.build() + } +} From c5ebf4a2e6f6f93a79e5e1f8fd214f8785233a00 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 15 Sep 2021 18:05:11 +0200 Subject: [PATCH 2/3] refactor: remove unused predicate serialization variants --- .../protos/influxdata/iox/catalog/v1/predicate.proto | 5 ++--- predicate/src/serialize.rs | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/generated_types/protos/influxdata/iox/catalog/v1/predicate.proto b/generated_types/protos/influxdata/iox/catalog/v1/predicate.proto index 08e2ff4393..bd68d7d7f6 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/predicate.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/predicate.proto @@ -74,8 +74,7 @@ message Scalar { oneof value { bool value_bool = 1; int64 value_i64 = 2; - uint64 value_u64 = 3; - double value_f64 = 4; - string value_string = 5; + double value_f64 = 3; + string value_string = 4; } } diff --git a/predicate/src/serialize.rs b/predicate/src/serialize.rs index 037abf41f0..c29653c13e 100644 --- a/predicate/src/serialize.rs +++ b/predicate/src/serialize.rs @@ -213,7 +213,6 @@ fn deserialize_scalar_value( { proto::scalar::Value::ValueBool(value) => Ok(ScalarValue::Boolean(Some(*value))), proto::scalar::Value::ValueI64(value) => Ok(ScalarValue::Int64(Some(*value))), - proto::scalar::Value::ValueU64(value) => Ok(ScalarValue::UInt64(Some(*value))), proto::scalar::Value::ValueF64(value) => Ok(ScalarValue::Float64(Some(*value))), proto::scalar::Value::ValueString(value) => Ok(ScalarValue::Utf8(Some(value.clone()))), } From 72021e010f0e03e70d21447d6d8b93b54fb6ea46 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 15 Sep 2021 18:18:05 +0200 Subject: [PATCH 3/3] test: test unsupported cases of predicate serialization --- Cargo.lock | 1 + predicate/Cargo.toml | 1 + predicate/src/serialize.rs | 73 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index bd2cbdfec0..c1b6b90938 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3156,6 +3156,7 @@ dependencies = [ "regex", "snafu", "sqlparser", + "test_helpers", "tokio", ] diff --git a/predicate/Cargo.toml b/predicate/Cargo.toml index 76dfd0975f..f6a48da350 100644 --- a/predicate/Cargo.toml +++ b/predicate/Cargo.toml @@ -17,4 +17,5 @@ snafu = "0.6.9" sqlparser = "0.10.0" [dev-dependencies] +test_helpers = { path = "../test_helpers" } tokio = { version = "1.11", features = ["macros"] } diff --git a/predicate/src/serialize.rs b/predicate/src/serialize.rs index c29653c13e..2d34817de2 100644 --- a/predicate/src/serialize.rs +++ b/predicate/src/serialize.rs @@ -228,6 +228,9 @@ fn deserialize_operator(op: &proto::Op) -> Result { #[cfg(test)] mod tests { + use arrow::datatypes::DataType; + use test_helpers::assert_contains; + use crate::predicate::{ParseDeletePredicate, PredicateBuilder}; use super::*; @@ -241,6 +244,76 @@ mod tests { assert_eq!(predicate, recovered); } + #[test] + fn test_fail_serialize_unsupported_expression() { + let table_name = "my_table"; + let mut predicate = delete_predicate(table_name); + predicate.exprs.push(Expr::Not(Box::new(Expr::BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "foo".to_string(), + })), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("x".to_string())))), + }))); + let err = serialize(&predicate).unwrap_err(); + assert_contains!(err.to_string(), "unsupported expression:"); + } + + #[test] + fn test_fail_serialize_unsupported_operants() { + let table_name = "my_table"; + let mut predicate = delete_predicate(table_name); + predicate.exprs.push(Expr::BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "foo".to_string(), + })), + op: Operator::Eq, + right: Box::new(Expr::Column(Column { + relation: None, + name: "bar".to_string(), + })), + }); + let err = serialize(&predicate).unwrap_err(); + assert_contains!(err.to_string(), "unsupported operants:"); + } + + #[test] + fn test_fail_serialize_unsupported_scalar_value() { + let table_name = "my_table"; + let mut predicate = delete_predicate(table_name); + predicate.exprs.push(Expr::BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "foo".to_string(), + })), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::List( + Some(Box::new(vec![])), + Box::new(DataType::Float64), + ))), + }); + let err = serialize(&predicate).unwrap_err(); + assert_contains!(err.to_string(), "unsupported scalar value:"); + } + + #[test] + fn test_fail_serialize_unsupported_operator() { + let table_name = "my_table"; + let mut predicate = delete_predicate(table_name); + predicate.exprs.push(Expr::BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "foo".to_string(), + })), + op: Operator::Like, + right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("x".to_string())))), + }); + let err = serialize(&predicate).unwrap_err(); + assert_contains!(err.to_string(), "unsupported operator:"); + } + fn delete_predicate(table_name: &str) -> Predicate { let start_time = "11"; let stop_time = "22";