diff --git a/Cargo.lock b/Cargo.lock index 3e43b0b20d..6081c8139a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2965,6 +2965,7 @@ dependencies = [ "generated_types", "internal_types", "observability_deps", + "ordered-float 2.8.0", "regex", "serde_json", "snafu", diff --git a/generated_types/protos/influxdata/iox/catalog/v1/predicate.proto b/generated_types/protos/influxdata/iox/catalog/v1/predicate.proto index bd68d7d7f6..79216c6e23 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/predicate.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/predicate.proto @@ -45,7 +45,7 @@ message TimestampRange { // Single expression to be used as parts of a predicate. // -// Only very simple expression of the type ` ` are supported. +// Only very simple expression of the type ` ` are supported. message Expr { // Column (w/o table name). string column = 1; diff --git a/predicate/Cargo.toml b/predicate/Cargo.toml index 6ffc8a7012..3948be19f1 100644 --- a/predicate/Cargo.toml +++ b/predicate/Cargo.toml @@ -12,6 +12,7 @@ datafusion_util = { path = "../datafusion_util" } generated_types = { path = "../generated_types" } internal_types = { path = "../internal_types" } observability_deps = { path = "../observability_deps" } +ordered-float = "2" regex = "1" serde_json = "1.0.67" snafu = "0.6.9" diff --git a/predicate/src/expr.rs b/predicate/src/expr.rs new file mode 100644 index 0000000000..f91a0de06b --- /dev/null +++ b/predicate/src/expr.rs @@ -0,0 +1,496 @@ +use std::{ + convert::{TryFrom, TryInto}, + ops::Deref, +}; + +use generated_types::influxdata::iox::catalog::v1 as proto; +use ordered_float::OrderedFloat; +use snafu::{OptionExt, ResultExt, Snafu}; + +/// Single expression to be used as parts of a predicate. +/// +/// Only very simple expression of the type ` ` are supported. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Expr { + /// Column (w/o table name). + column: String, + + /// Operator. + op: Op, + + /// Scalar value. + scalar: Scalar, +} + +impl Expr { + /// Column (w/o table name). + pub fn column(&self) -> &str { + &self.column + } + + /// Operator. + pub fn op(&self) -> Op { + self.op + } + + /// Scalar value. + pub fn scalar(&self) -> &Scalar { + &self.scalar + } +} + +impl std::fmt::Display for Expr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}{}{}", self.column(), self.op(), self.scalar()) + } +} + +impl From for datafusion::logical_plan::Expr { + fn from(expr: Expr) -> Self { + let column = datafusion::logical_plan::Column { + relation: None, + name: expr.column, + }; + + datafusion::logical_plan::Expr::BinaryExpr { + left: Box::new(datafusion::logical_plan::Expr::Column(column)), + op: expr.op.into(), + right: Box::new(datafusion::logical_plan::Expr::Literal(expr.scalar.into())), + } + } +} + +#[derive(Debug, Snafu)] +pub enum ProtoToExprError { + #[snafu(display("cannot deserialize operator: {}", source))] + CannotDeserializeOperator { source: crate::expr::ProtoToOpError }, + + #[snafu(display("illegal operator enum value: {}", value))] + IllegalOperatorEnumValue { value: i32 }, + + #[snafu(display("missing scalar"))] + MissingScalar, + + #[snafu(display("cannot deserialize scalar: {}", source))] + CannotDeserializeScalar { + source: crate::expr::ProtoToScalarError, + }, +} + +impl TryFrom for Expr { + type Error = ProtoToExprError; + + fn try_from(expr: proto::Expr) -> Result { + let op = proto::Op::from_i32(expr.op) + .context(IllegalOperatorEnumValue { value: expr.op })? + .try_into() + .context(CannotDeserializeOperator)?; + + let scalar = expr + .clone() + .scalar + .context(MissingScalar)? + .try_into() + .context(CannotDeserializeScalar)?; + + Ok(Expr { + column: expr.column, + op, + scalar, + }) + } +} + +#[derive(Debug, Snafu)] +pub enum DataFusionToExprError { + #[snafu(display("unsupported expression: {:?}", expr))] + UnsupportedExpression { + expr: datafusion::logical_plan::Expr, + }, + + #[snafu(display("unsupported operants: left {:?}; right {:?}", left, right))] + UnsupportedOperants { + left: datafusion::logical_plan::Expr, + right: datafusion::logical_plan::Expr, + }, + + #[snafu(display("cannot convert datafusion operator: {}", source))] + CannotConvertDataFusionOperator { + source: crate::expr::DataFusionToOpError, + }, + + #[snafu(display("cannot convert datafusion scalar value: {}", source))] + CannotConvertDataFusionScalarValue { + source: crate::expr::DataFusionToScalarError, + }, +} + +impl TryFrom for Expr { + type Error = DataFusionToExprError; + + fn try_from(expr: datafusion::logical_plan::Expr) -> Result { + match expr { + datafusion::logical_plan::Expr::BinaryExpr { left, op, right } => { + let (column, scalar) = match (left.deref(), right.deref()) { + // The delete predicate parser currently only supports ``, not ``, + // however this could can easily be extended to support the latter case as well. + ( + datafusion::logical_plan::Expr::Column(column), + datafusion::logical_plan::Expr::Literal(value), + ) => { + let column = column.name.clone(); + + let scalar: Scalar = value + .clone() + .try_into() + .context(CannotConvertDataFusionScalarValue)?; + + (column, scalar) + } + (other_left, other_right) => { + return Err(DataFusionToExprError::UnsupportedOperants { + left: other_left.clone(), + right: other_right.clone(), + }); + } + }; + + let op: Op = op.try_into().context(CannotConvertDataFusionOperator)?; + + Ok(Expr { column, op, scalar }) + } + other => Err(DataFusionToExprError::UnsupportedExpression { expr: other }), + } + } +} + +impl From for proto::Expr { + fn from(expr: Expr) -> Self { + let op: proto::Op = expr.op.into(); + + proto::Expr { + column: expr.column, + op: op.into(), + scalar: Some(expr.scalar.into()), + } + } +} + +/// Binary operator that can be evaluated on a column and a scalar value. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum Op { + /// Strict equality (`=`). + Eq, + + /// Inequality (`!=`). + Ne, +} + +impl std::fmt::Display for Op { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Op::Eq => write!(f, "="), + Op::Ne => write!(f, "!="), + } + } +} + +impl From for datafusion::logical_plan::Operator { + fn from(op: Op) -> Self { + match op { + Op::Eq => datafusion::logical_plan::Operator::Eq, + Op::Ne => datafusion::logical_plan::Operator::NotEq, + } + } +} + +#[derive(Debug, Snafu)] +pub enum DataFusionToOpError { + #[snafu(display("unsupported operator: {:?}", op))] + UnsupportedOperator { + op: datafusion::logical_plan::Operator, + }, +} + +impl TryFrom for Op { + type Error = DataFusionToOpError; + + fn try_from(op: datafusion::logical_plan::Operator) -> Result { + match op { + datafusion::logical_plan::Operator::Eq => Ok(Op::Eq), + datafusion::logical_plan::Operator::NotEq => Ok(Op::Ne), + other => Err(DataFusionToOpError::UnsupportedOperator { op: other }), + } + } +} + +impl From for proto::Op { + fn from(op: Op) -> Self { + match op { + Op::Eq => proto::Op::Eq, + Op::Ne => proto::Op::Ne, + } + } +} + +#[derive(Debug, Snafu)] +pub enum ProtoToOpError { + #[snafu(display("unspecified operator"))] + UnspecifiedOperator, +} + +impl TryFrom for Op { + type Error = ProtoToOpError; + + fn try_from(op: proto::Op) -> Result { + match op { + proto::Op::Unspecified => Err(ProtoToOpError::UnspecifiedOperator), + proto::Op::Eq => Ok(Op::Eq), + proto::Op::Ne => Ok(Op::Ne), + } + } +} + +/// Scalar value of a certain type. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum Scalar { + Bool(bool), + I64(i64), + F64(OrderedFloat), + String(String), +} + +impl std::fmt::Display for Scalar { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Scalar::Bool(value) => value.fmt(f), + Scalar::I64(value) => value.fmt(f), + Scalar::F64(value) => value.fmt(f), + Scalar::String(value) => write!(f, "'{}'", value), + } + } +} + +impl From for datafusion::scalar::ScalarValue { + fn from(scalar: Scalar) -> Self { + match scalar { + Scalar::Bool(value) => datafusion::scalar::ScalarValue::Boolean(Some(value)), + Scalar::I64(value) => datafusion::scalar::ScalarValue::Int64(Some(value)), + Scalar::F64(value) => datafusion::scalar::ScalarValue::Float64(Some(value.into())), + Scalar::String(value) => datafusion::scalar::ScalarValue::Utf8(Some(value)), + } + } +} + +#[derive(Debug, Snafu)] +pub enum ProtoToScalarError { + #[snafu(display("missing scalar value"))] + MissingScalarValue, +} + +impl TryFrom for Scalar { + type Error = ProtoToScalarError; + + fn try_from(scalar: proto::Scalar) -> Result { + match scalar.value.context(MissingScalarValue)? { + proto::scalar::Value::ValueBool(value) => Ok(Scalar::Bool(value)), + proto::scalar::Value::ValueI64(value) => Ok(Scalar::I64(value)), + proto::scalar::Value::ValueF64(value) => Ok(Scalar::F64(value.into())), + proto::scalar::Value::ValueString(value) => Ok(Scalar::String(value)), + } + } +} + +#[derive(Debug, Snafu)] +pub enum DataFusionToScalarError { + #[snafu(display("unsupported scalar value: {:?}", value))] + UnsupportedScalarValue { + value: datafusion::scalar::ScalarValue, + }, +} + +impl TryFrom for Scalar { + type Error = DataFusionToScalarError; + + fn try_from(scalar: datafusion::scalar::ScalarValue) -> Result { + // see https://github.com/apache/arrow-datafusion/blob/195b69995db8044ce283d72fb78eb6b74b8842f5/datafusion/src/sql/planner.rs#L274-L295 + match scalar { + datafusion::scalar::ScalarValue::Utf8(Some(value)) => Ok(Scalar::String(value)), + datafusion::scalar::ScalarValue::Int64(Some(value)) => Ok(Scalar::I64(value)), + datafusion::scalar::ScalarValue::Float64(Some(value)) => Ok(Scalar::F64(value.into())), + datafusion::scalar::ScalarValue::Boolean(Some(value)) => Ok(Scalar::Bool(value)), + other => Err(DataFusionToScalarError::UnsupportedScalarValue { value: other }), + } + } +} + +impl From for proto::Scalar { + fn from(scalar: Scalar) -> Self { + match scalar { + Scalar::Bool(value) => proto::Scalar { + value: Some(proto::scalar::Value::ValueBool(value)), + }, + Scalar::I64(value) => proto::Scalar { + value: Some(proto::scalar::Value::ValueI64(value)), + }, + Scalar::F64(value) => proto::Scalar { + value: Some(proto::scalar::Value::ValueF64(value.into())), + }, + Scalar::String(value) => proto::Scalar { + value: Some(proto::scalar::Value::ValueString(value)), + }, + } + } +} + +#[cfg(test)] +mod tests { + use test_helpers::assert_contains; + + use super::*; + + #[test] + fn test_roundtrips() { + assert_expr_works( + Expr { + column: "foo".to_string(), + op: Op::Eq, + scalar: Scalar::Bool(true), + }, + "foo=true", + ); + assert_expr_works( + Expr { + column: "bar".to_string(), + op: Op::Ne, + scalar: Scalar::I64(-1), + }, + "bar!=-1", + ); + assert_expr_works( + Expr { + column: "baz".to_string(), + op: Op::Eq, + scalar: Scalar::F64((-1.1).into()), + }, + "baz=-1.1", + ); + assert_expr_works( + Expr { + column: "col".to_string(), + op: Op::Eq, + scalar: Scalar::String("foo".to_string()), + }, + "col='foo'", + ); + } + + fn assert_expr_works(expr: Expr, display: &str) { + let df_expr: datafusion::logical_plan::Expr = expr.clone().into(); + let expr2: Expr = df_expr.try_into().unwrap(); + assert_eq!(expr2, expr); + + let proto_expr: proto::Expr = expr.clone().into(); + let expr3: Expr = proto_expr.try_into().unwrap(); + assert_eq!(expr3, expr); + + assert_eq!(expr.to_string(), display); + } + + #[test] + fn test_unsupported_expression() { + let expr = datafusion::logical_plan::Expr::Not(Box::new( + datafusion::logical_plan::Expr::BinaryExpr { + left: Box::new(datafusion::logical_plan::Expr::Column( + datafusion::logical_plan::Column { + relation: None, + name: "foo".to_string(), + }, + )), + op: datafusion::logical_plan::Operator::Eq, + right: Box::new(datafusion::logical_plan::Expr::Literal( + datafusion::scalar::ScalarValue::Utf8(Some("x".to_string())), + )), + }, + )); + let res: Result = expr.try_into(); + assert_contains!(res.unwrap_err().to_string(), "unsupported expression:"); + } + + #[test] + fn test_unsupported_operants() { + let expr = datafusion::logical_plan::Expr::BinaryExpr { + left: Box::new(datafusion::logical_plan::Expr::Column( + datafusion::logical_plan::Column { + relation: None, + name: "foo".to_string(), + }, + )), + op: datafusion::logical_plan::Operator::Eq, + right: Box::new(datafusion::logical_plan::Expr::Column( + datafusion::logical_plan::Column { + relation: None, + name: "bar".to_string(), + }, + )), + }; + let res: Result = expr.try_into(); + assert_contains!(res.unwrap_err().to_string(), "unsupported operants:"); + } + + #[test] + fn test_unsupported_scalar_value() { + let scalar = datafusion::scalar::ScalarValue::List( + Some(Box::new(vec![])), + Box::new(arrow::datatypes::DataType::Float64), + ); + let res: Result = scalar.try_into(); + assert_contains!(res.unwrap_err().to_string(), "unsupported scalar value:"); + } + + #[test] + fn test_unsupported_scalar_value_in_expr() { + let expr = datafusion::logical_plan::Expr::BinaryExpr { + left: Box::new(datafusion::logical_plan::Expr::Column( + datafusion::logical_plan::Column { + relation: None, + name: "foo".to_string(), + }, + )), + op: datafusion::logical_plan::Operator::Eq, + right: Box::new(datafusion::logical_plan::Expr::Literal( + datafusion::scalar::ScalarValue::List( + Some(Box::new(vec![])), + Box::new(arrow::datatypes::DataType::Float64), + ), + )), + }; + let res: Result = expr.try_into(); + assert_contains!(res.unwrap_err().to_string(), "unsupported scalar value:"); + } + + #[test] + fn test_unsupported_operator() { + let op = datafusion::logical_plan::Operator::Like; + let res: Result = op.try_into(); + assert_contains!(res.unwrap_err().to_string(), "unsupported operator:"); + } + + #[test] + fn test_unsupported_operator_in_expr() { + let expr = datafusion::logical_plan::Expr::BinaryExpr { + left: Box::new(datafusion::logical_plan::Expr::Column( + datafusion::logical_plan::Column { + relation: None, + name: "foo".to_string(), + }, + )), + op: datafusion::logical_plan::Operator::Like, + right: Box::new(datafusion::logical_plan::Expr::Literal( + datafusion::scalar::ScalarValue::Utf8(Some("x".to_string())), + )), + }; + let res: Result = expr.try_into(); + assert_contains!(res.unwrap_err().to_string(), "unsupported operator:"); + } +} diff --git a/predicate/src/lib.rs b/predicate/src/lib.rs index 0d433d12a1..1c10d58db0 100644 --- a/predicate/src/lib.rs +++ b/predicate/src/lib.rs @@ -1,3 +1,4 @@ +pub mod expr; pub mod predicate; pub mod regex; pub mod serialize; diff --git a/predicate/src/serialize.rs b/predicate/src/serialize.rs index 5ea51a1669..83dedd73ab 100644 --- a/predicate/src/serialize.rs +++ b/predicate/src/serialize.rs @@ -6,31 +6,20 @@ //! //! [Ballista]: https://github.com/apache/arrow-datafusion/blob/22fcb3d7a68a56afbe12eab9e7d98f7b8de33703/ballista/rust/core/proto/ballista.proto //! [Protocol Buffers 3]: https://developers.google.com/protocol-buffers/docs/proto3 -use std::{collections::BTreeSet, ops::Deref}; +use std::{collections::BTreeSet, convert::TryInto}; use data_types::timestamp::TimestampRange; -use datafusion::{ - logical_plan::{Column, Expr, Operator}, - scalar::ScalarValue, -}; use generated_types::influxdata::iox::catalog::v1 as proto; -use snafu::{OptionExt, Snafu}; +use snafu::{ResultExt, Snafu}; -use crate::predicate::Predicate; +use crate::{expr::Expr, predicate::Predicate}; #[derive(Debug, Snafu)] pub enum SerializeError { - #[snafu(display("unsupported expression: {:?}", expr))] - UnsupportedExpression { expr: Expr }, - - #[snafu(display("unsupported operants: left {:?}; right {:?}", left, right))] - UnsupportedOperants { left: Expr, right: Expr }, - - #[snafu(display("unsupported scalar value: {:?}", value))] - UnsupportedScalarValue { value: ScalarValue }, - - #[snafu(display("unsupported operator: {:?}", op))] - UnsupportedOperator { op: Operator }, + #[snafu(display("cannot convert datafusion expr: {}", source))] + CannotConvertDataFusionExpr { + source: crate::expr::DataFusionToExprError, + }, } /// Serialize IOx [`Predicate`] to a protobuf object. @@ -43,7 +32,13 @@ pub fn serialize(predicate: &Predicate) -> Result, SerializeError>>()?, }; Ok(proto_predicate) @@ -69,80 +64,12 @@ fn serialize_timestamp_range(r: &Option) -> Option Result { - match expr { - Expr::BinaryExpr { left, op, right } => { - let (column, scalar) = match (left.deref(), right.deref()) { - // The delete predicate parser currently only supports ``, not ``, - // however this could can easily be extended to support the latter case as well. - (Expr::Column(column), Expr::Literal(value)) => { - let column = column.name.clone(); - let scalar = serialize_scalar_value(value)?; - - (column, scalar) - } - (other_left, other_right) => { - return Err(SerializeError::UnsupportedOperants { - left: other_left.clone(), - right: other_right.clone(), - }); - } - }; - - let op = serialize_operator(op)?; - - let proto_expr = proto::Expr { - column, - op: op.into(), - scalar: Some(scalar), - }; - Ok(proto_expr) - } - other => Err(SerializeError::UnsupportedExpression { - expr: other.clone(), - }), - } -} - -fn serialize_scalar_value(value: &ScalarValue) -> Result { - // see https://github.com/apache/arrow-datafusion/blob/195b69995db8044ce283d72fb78eb6b74b8842f5/datafusion/src/sql/planner.rs#L274-L295 - match value { - ScalarValue::Utf8(Some(value)) => Ok(proto::Scalar { - value: Some(proto::scalar::Value::ValueString(value.clone())), - }), - ScalarValue::Int64(Some(value)) => Ok(proto::Scalar { - value: Some(proto::scalar::Value::ValueI64(*value)), - }), - ScalarValue::Float64(Some(value)) => Ok(proto::Scalar { - value: Some(proto::scalar::Value::ValueF64(*value)), - }), - ScalarValue::Boolean(Some(value)) => Ok(proto::Scalar { - value: Some(proto::scalar::Value::ValueBool(*value)), - }), - other => Err(SerializeError::UnsupportedScalarValue { - value: other.clone(), - }), - } -} - -fn serialize_operator(op: &Operator) -> Result { - match op { - Operator::Eq => Ok(proto::Op::Eq), - Operator::NotEq => Ok(proto::Op::Ne), - other => Err(SerializeError::UnsupportedOperator { op: *other }), - } -} - #[derive(Debug, Snafu)] pub enum DeserializeError { - #[snafu(display("unspecified operator"))] - UnspecifiedOperator, - - #[snafu(display("illegal operator enum value: {}", value))] - IllegalOperatorEnumValue { value: i32 }, - - #[snafu(display("missing scalar value"))] - MissingScalarValue, + #[snafu(display("cannot deserialize expr: {}", source))] + CannotDeserializeExpr { + source: crate::expr::ProtoToExprError, + }, } /// Deserialize IOx [`Predicate`] from a protobuf object. @@ -155,8 +82,11 @@ pub fn deserialize(proto_predicate: &proto::Predicate) -> Result, DeserializeError>>()?, + .map(|expr| { + let expr: Expr = expr.clone().try_into().context(CannotDeserializeExpr)?; + Ok(expr.into()) + }) + .collect::, DeserializeError>>()?, }; Ok(predicate) } @@ -178,56 +108,8 @@ fn deserialize_timestamp_range(r: &Option) -> Option