refactor: use dedicated in-memory `Expr` type during IO
Place a dedicated `Expr` type between datafusion and protobuf. While this type is currently only used during serialization, it will be used within `Predicate` in a follow-up change to enable de-duplication of predicates and avoid the double parsing of datafusion expressions (we are already somewhat parsing them when we check for valid delete predicates). This change looks larger than it is. In practice it just separates the conversion "datafusion => protobuf" into "datafusion => IOx => protobuf" and "protobuf => datafusion" into "protobuf => IOx => datafusion". So (apart from the error types) this is functionally the same.pull/24376/head
parent
aa4001e8b8
commit
a0bbbdb197
|
@ -2965,6 +2965,7 @@ dependencies = [
|
|||
"generated_types",
|
||||
"internal_types",
|
||||
"observability_deps",
|
||||
"ordered-float 2.8.0",
|
||||
"regex",
|
||||
"serde_json",
|
||||
"snafu",
|
||||
|
|
|
@ -45,7 +45,7 @@ message TimestampRange {
|
|||
|
||||
// Single expression to be used as parts of a predicate.
|
||||
//
|
||||
// Only very simple expression of the type `<columng> <op> <scalar>` are supported.
|
||||
// Only very simple expression of the type `<column> <op> <scalar>` are supported.
|
||||
message Expr {
|
||||
// Column (w/o table name).
|
||||
string column = 1;
|
||||
|
|
|
@ -12,6 +12,7 @@ datafusion_util = { path = "../datafusion_util" }
|
|||
generated_types = { path = "../generated_types" }
|
||||
internal_types = { path = "../internal_types" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
ordered-float = "2"
|
||||
regex = "1"
|
||||
serde_json = "1.0.67"
|
||||
snafu = "0.6.9"
|
||||
|
|
|
@ -0,0 +1,496 @@
|
|||
use std::{
|
||||
convert::{TryFrom, TryInto},
|
||||
ops::Deref,
|
||||
};
|
||||
|
||||
use generated_types::influxdata::iox::catalog::v1 as proto;
|
||||
use ordered_float::OrderedFloat;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
|
||||
/// Single expression to be used as parts of a predicate.
|
||||
///
|
||||
/// Only very simple expression of the type `<column> <op> <scalar>` are supported.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct Expr {
|
||||
/// Column (w/o table name).
|
||||
column: String,
|
||||
|
||||
/// Operator.
|
||||
op: Op,
|
||||
|
||||
/// Scalar value.
|
||||
scalar: Scalar,
|
||||
}
|
||||
|
||||
impl Expr {
|
||||
/// Column (w/o table name).
|
||||
pub fn column(&self) -> &str {
|
||||
&self.column
|
||||
}
|
||||
|
||||
/// Operator.
|
||||
pub fn op(&self) -> Op {
|
||||
self.op
|
||||
}
|
||||
|
||||
/// Scalar value.
|
||||
pub fn scalar(&self) -> &Scalar {
|
||||
&self.scalar
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Expr {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}{}{}", self.column(), self.op(), self.scalar())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Expr> for datafusion::logical_plan::Expr {
|
||||
fn from(expr: Expr) -> Self {
|
||||
let column = datafusion::logical_plan::Column {
|
||||
relation: None,
|
||||
name: expr.column,
|
||||
};
|
||||
|
||||
datafusion::logical_plan::Expr::BinaryExpr {
|
||||
left: Box::new(datafusion::logical_plan::Expr::Column(column)),
|
||||
op: expr.op.into(),
|
||||
right: Box::new(datafusion::logical_plan::Expr::Literal(expr.scalar.into())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum ProtoToExprError {
|
||||
#[snafu(display("cannot deserialize operator: {}", source))]
|
||||
CannotDeserializeOperator { source: crate::expr::ProtoToOpError },
|
||||
|
||||
#[snafu(display("illegal operator enum value: {}", value))]
|
||||
IllegalOperatorEnumValue { value: i32 },
|
||||
|
||||
#[snafu(display("missing scalar"))]
|
||||
MissingScalar,
|
||||
|
||||
#[snafu(display("cannot deserialize scalar: {}", source))]
|
||||
CannotDeserializeScalar {
|
||||
source: crate::expr::ProtoToScalarError,
|
||||
},
|
||||
}
|
||||
|
||||
impl TryFrom<proto::Expr> for Expr {
|
||||
type Error = ProtoToExprError;
|
||||
|
||||
fn try_from(expr: proto::Expr) -> Result<Self, Self::Error> {
|
||||
let op = proto::Op::from_i32(expr.op)
|
||||
.context(IllegalOperatorEnumValue { value: expr.op })?
|
||||
.try_into()
|
||||
.context(CannotDeserializeOperator)?;
|
||||
|
||||
let scalar = expr
|
||||
.clone()
|
||||
.scalar
|
||||
.context(MissingScalar)?
|
||||
.try_into()
|
||||
.context(CannotDeserializeScalar)?;
|
||||
|
||||
Ok(Expr {
|
||||
column: expr.column,
|
||||
op,
|
||||
scalar,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum DataFusionToExprError {
|
||||
#[snafu(display("unsupported expression: {:?}", expr))]
|
||||
UnsupportedExpression {
|
||||
expr: datafusion::logical_plan::Expr,
|
||||
},
|
||||
|
||||
#[snafu(display("unsupported operants: left {:?}; right {:?}", left, right))]
|
||||
UnsupportedOperants {
|
||||
left: datafusion::logical_plan::Expr,
|
||||
right: datafusion::logical_plan::Expr,
|
||||
},
|
||||
|
||||
#[snafu(display("cannot convert datafusion operator: {}", source))]
|
||||
CannotConvertDataFusionOperator {
|
||||
source: crate::expr::DataFusionToOpError,
|
||||
},
|
||||
|
||||
#[snafu(display("cannot convert datafusion scalar value: {}", source))]
|
||||
CannotConvertDataFusionScalarValue {
|
||||
source: crate::expr::DataFusionToScalarError,
|
||||
},
|
||||
}
|
||||
|
||||
impl TryFrom<datafusion::logical_plan::Expr> for Expr {
|
||||
type Error = DataFusionToExprError;
|
||||
|
||||
fn try_from(expr: datafusion::logical_plan::Expr) -> Result<Self, Self::Error> {
|
||||
match expr {
|
||||
datafusion::logical_plan::Expr::BinaryExpr { left, op, right } => {
|
||||
let (column, scalar) = match (left.deref(), right.deref()) {
|
||||
// The delete predicate parser currently only supports `<column><op><value>`, not `<value><op><column>`,
|
||||
// however this could can easily be extended to support the latter case as well.
|
||||
(
|
||||
datafusion::logical_plan::Expr::Column(column),
|
||||
datafusion::logical_plan::Expr::Literal(value),
|
||||
) => {
|
||||
let column = column.name.clone();
|
||||
|
||||
let scalar: Scalar = value
|
||||
.clone()
|
||||
.try_into()
|
||||
.context(CannotConvertDataFusionScalarValue)?;
|
||||
|
||||
(column, scalar)
|
||||
}
|
||||
(other_left, other_right) => {
|
||||
return Err(DataFusionToExprError::UnsupportedOperants {
|
||||
left: other_left.clone(),
|
||||
right: other_right.clone(),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
let op: Op = op.try_into().context(CannotConvertDataFusionOperator)?;
|
||||
|
||||
Ok(Expr { column, op, scalar })
|
||||
}
|
||||
other => Err(DataFusionToExprError::UnsupportedExpression { expr: other }),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Expr> for proto::Expr {
|
||||
fn from(expr: Expr) -> Self {
|
||||
let op: proto::Op = expr.op.into();
|
||||
|
||||
proto::Expr {
|
||||
column: expr.column,
|
||||
op: op.into(),
|
||||
scalar: Some(expr.scalar.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Binary operator that can be evaluated on a column and a scalar value.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub enum Op {
|
||||
/// Strict equality (`=`).
|
||||
Eq,
|
||||
|
||||
/// Inequality (`!=`).
|
||||
Ne,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Op {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Op::Eq => write!(f, "="),
|
||||
Op::Ne => write!(f, "!="),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Op> for datafusion::logical_plan::Operator {
|
||||
fn from(op: Op) -> Self {
|
||||
match op {
|
||||
Op::Eq => datafusion::logical_plan::Operator::Eq,
|
||||
Op::Ne => datafusion::logical_plan::Operator::NotEq,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum DataFusionToOpError {
|
||||
#[snafu(display("unsupported operator: {:?}", op))]
|
||||
UnsupportedOperator {
|
||||
op: datafusion::logical_plan::Operator,
|
||||
},
|
||||
}
|
||||
|
||||
impl TryFrom<datafusion::logical_plan::Operator> for Op {
|
||||
type Error = DataFusionToOpError;
|
||||
|
||||
fn try_from(op: datafusion::logical_plan::Operator) -> Result<Self, Self::Error> {
|
||||
match op {
|
||||
datafusion::logical_plan::Operator::Eq => Ok(Op::Eq),
|
||||
datafusion::logical_plan::Operator::NotEq => Ok(Op::Ne),
|
||||
other => Err(DataFusionToOpError::UnsupportedOperator { op: other }),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Op> for proto::Op {
|
||||
fn from(op: Op) -> Self {
|
||||
match op {
|
||||
Op::Eq => proto::Op::Eq,
|
||||
Op::Ne => proto::Op::Ne,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum ProtoToOpError {
|
||||
#[snafu(display("unspecified operator"))]
|
||||
UnspecifiedOperator,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::Op> for Op {
|
||||
type Error = ProtoToOpError;
|
||||
|
||||
fn try_from(op: proto::Op) -> Result<Self, Self::Error> {
|
||||
match op {
|
||||
proto::Op::Unspecified => Err(ProtoToOpError::UnspecifiedOperator),
|
||||
proto::Op::Eq => Ok(Op::Eq),
|
||||
proto::Op::Ne => Ok(Op::Ne),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Scalar value of a certain type.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub enum Scalar {
|
||||
Bool(bool),
|
||||
I64(i64),
|
||||
F64(OrderedFloat<f64>),
|
||||
String(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Scalar {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Scalar::Bool(value) => value.fmt(f),
|
||||
Scalar::I64(value) => value.fmt(f),
|
||||
Scalar::F64(value) => value.fmt(f),
|
||||
Scalar::String(value) => write!(f, "'{}'", value),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Scalar> for datafusion::scalar::ScalarValue {
|
||||
fn from(scalar: Scalar) -> Self {
|
||||
match scalar {
|
||||
Scalar::Bool(value) => datafusion::scalar::ScalarValue::Boolean(Some(value)),
|
||||
Scalar::I64(value) => datafusion::scalar::ScalarValue::Int64(Some(value)),
|
||||
Scalar::F64(value) => datafusion::scalar::ScalarValue::Float64(Some(value.into())),
|
||||
Scalar::String(value) => datafusion::scalar::ScalarValue::Utf8(Some(value)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum ProtoToScalarError {
|
||||
#[snafu(display("missing scalar value"))]
|
||||
MissingScalarValue,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::Scalar> for Scalar {
|
||||
type Error = ProtoToScalarError;
|
||||
|
||||
fn try_from(scalar: proto::Scalar) -> Result<Self, Self::Error> {
|
||||
match scalar.value.context(MissingScalarValue)? {
|
||||
proto::scalar::Value::ValueBool(value) => Ok(Scalar::Bool(value)),
|
||||
proto::scalar::Value::ValueI64(value) => Ok(Scalar::I64(value)),
|
||||
proto::scalar::Value::ValueF64(value) => Ok(Scalar::F64(value.into())),
|
||||
proto::scalar::Value::ValueString(value) => Ok(Scalar::String(value)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum DataFusionToScalarError {
|
||||
#[snafu(display("unsupported scalar value: {:?}", value))]
|
||||
UnsupportedScalarValue {
|
||||
value: datafusion::scalar::ScalarValue,
|
||||
},
|
||||
}
|
||||
|
||||
impl TryFrom<datafusion::scalar::ScalarValue> for Scalar {
|
||||
type Error = DataFusionToScalarError;
|
||||
|
||||
fn try_from(scalar: datafusion::scalar::ScalarValue) -> Result<Self, Self::Error> {
|
||||
// see https://github.com/apache/arrow-datafusion/blob/195b69995db8044ce283d72fb78eb6b74b8842f5/datafusion/src/sql/planner.rs#L274-L295
|
||||
match scalar {
|
||||
datafusion::scalar::ScalarValue::Utf8(Some(value)) => Ok(Scalar::String(value)),
|
||||
datafusion::scalar::ScalarValue::Int64(Some(value)) => Ok(Scalar::I64(value)),
|
||||
datafusion::scalar::ScalarValue::Float64(Some(value)) => Ok(Scalar::F64(value.into())),
|
||||
datafusion::scalar::ScalarValue::Boolean(Some(value)) => Ok(Scalar::Bool(value)),
|
||||
other => Err(DataFusionToScalarError::UnsupportedScalarValue { value: other }),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Scalar> for proto::Scalar {
|
||||
fn from(scalar: Scalar) -> Self {
|
||||
match scalar {
|
||||
Scalar::Bool(value) => proto::Scalar {
|
||||
value: Some(proto::scalar::Value::ValueBool(value)),
|
||||
},
|
||||
Scalar::I64(value) => proto::Scalar {
|
||||
value: Some(proto::scalar::Value::ValueI64(value)),
|
||||
},
|
||||
Scalar::F64(value) => proto::Scalar {
|
||||
value: Some(proto::scalar::Value::ValueF64(value.into())),
|
||||
},
|
||||
Scalar::String(value) => proto::Scalar {
|
||||
value: Some(proto::scalar::Value::ValueString(value)),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_roundtrips() {
|
||||
assert_expr_works(
|
||||
Expr {
|
||||
column: "foo".to_string(),
|
||||
op: Op::Eq,
|
||||
scalar: Scalar::Bool(true),
|
||||
},
|
||||
"foo=true",
|
||||
);
|
||||
assert_expr_works(
|
||||
Expr {
|
||||
column: "bar".to_string(),
|
||||
op: Op::Ne,
|
||||
scalar: Scalar::I64(-1),
|
||||
},
|
||||
"bar!=-1",
|
||||
);
|
||||
assert_expr_works(
|
||||
Expr {
|
||||
column: "baz".to_string(),
|
||||
op: Op::Eq,
|
||||
scalar: Scalar::F64((-1.1).into()),
|
||||
},
|
||||
"baz=-1.1",
|
||||
);
|
||||
assert_expr_works(
|
||||
Expr {
|
||||
column: "col".to_string(),
|
||||
op: Op::Eq,
|
||||
scalar: Scalar::String("foo".to_string()),
|
||||
},
|
||||
"col='foo'",
|
||||
);
|
||||
}
|
||||
|
||||
fn assert_expr_works(expr: Expr, display: &str) {
|
||||
let df_expr: datafusion::logical_plan::Expr = expr.clone().into();
|
||||
let expr2: Expr = df_expr.try_into().unwrap();
|
||||
assert_eq!(expr2, expr);
|
||||
|
||||
let proto_expr: proto::Expr = expr.clone().into();
|
||||
let expr3: Expr = proto_expr.try_into().unwrap();
|
||||
assert_eq!(expr3, expr);
|
||||
|
||||
assert_eq!(expr.to_string(), display);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsupported_expression() {
|
||||
let expr = datafusion::logical_plan::Expr::Not(Box::new(
|
||||
datafusion::logical_plan::Expr::BinaryExpr {
|
||||
left: Box::new(datafusion::logical_plan::Expr::Column(
|
||||
datafusion::logical_plan::Column {
|
||||
relation: None,
|
||||
name: "foo".to_string(),
|
||||
},
|
||||
)),
|
||||
op: datafusion::logical_plan::Operator::Eq,
|
||||
right: Box::new(datafusion::logical_plan::Expr::Literal(
|
||||
datafusion::scalar::ScalarValue::Utf8(Some("x".to_string())),
|
||||
)),
|
||||
},
|
||||
));
|
||||
let res: Result<Expr, _> = expr.try_into();
|
||||
assert_contains!(res.unwrap_err().to_string(), "unsupported expression:");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsupported_operants() {
|
||||
let expr = datafusion::logical_plan::Expr::BinaryExpr {
|
||||
left: Box::new(datafusion::logical_plan::Expr::Column(
|
||||
datafusion::logical_plan::Column {
|
||||
relation: None,
|
||||
name: "foo".to_string(),
|
||||
},
|
||||
)),
|
||||
op: datafusion::logical_plan::Operator::Eq,
|
||||
right: Box::new(datafusion::logical_plan::Expr::Column(
|
||||
datafusion::logical_plan::Column {
|
||||
relation: None,
|
||||
name: "bar".to_string(),
|
||||
},
|
||||
)),
|
||||
};
|
||||
let res: Result<Expr, _> = expr.try_into();
|
||||
assert_contains!(res.unwrap_err().to_string(), "unsupported operants:");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsupported_scalar_value() {
|
||||
let scalar = datafusion::scalar::ScalarValue::List(
|
||||
Some(Box::new(vec![])),
|
||||
Box::new(arrow::datatypes::DataType::Float64),
|
||||
);
|
||||
let res: Result<Scalar, _> = scalar.try_into();
|
||||
assert_contains!(res.unwrap_err().to_string(), "unsupported scalar value:");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsupported_scalar_value_in_expr() {
|
||||
let expr = datafusion::logical_plan::Expr::BinaryExpr {
|
||||
left: Box::new(datafusion::logical_plan::Expr::Column(
|
||||
datafusion::logical_plan::Column {
|
||||
relation: None,
|
||||
name: "foo".to_string(),
|
||||
},
|
||||
)),
|
||||
op: datafusion::logical_plan::Operator::Eq,
|
||||
right: Box::new(datafusion::logical_plan::Expr::Literal(
|
||||
datafusion::scalar::ScalarValue::List(
|
||||
Some(Box::new(vec![])),
|
||||
Box::new(arrow::datatypes::DataType::Float64),
|
||||
),
|
||||
)),
|
||||
};
|
||||
let res: Result<Expr, _> = expr.try_into();
|
||||
assert_contains!(res.unwrap_err().to_string(), "unsupported scalar value:");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsupported_operator() {
|
||||
let op = datafusion::logical_plan::Operator::Like;
|
||||
let res: Result<Op, _> = op.try_into();
|
||||
assert_contains!(res.unwrap_err().to_string(), "unsupported operator:");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsupported_operator_in_expr() {
|
||||
let expr = datafusion::logical_plan::Expr::BinaryExpr {
|
||||
left: Box::new(datafusion::logical_plan::Expr::Column(
|
||||
datafusion::logical_plan::Column {
|
||||
relation: None,
|
||||
name: "foo".to_string(),
|
||||
},
|
||||
)),
|
||||
op: datafusion::logical_plan::Operator::Like,
|
||||
right: Box::new(datafusion::logical_plan::Expr::Literal(
|
||||
datafusion::scalar::ScalarValue::Utf8(Some("x".to_string())),
|
||||
)),
|
||||
};
|
||||
let res: Result<Expr, _> = expr.try_into();
|
||||
assert_contains!(res.unwrap_err().to_string(), "unsupported operator:");
|
||||
}
|
||||
}
|
|
@ -1,3 +1,4 @@
|
|||
pub mod expr;
|
||||
pub mod predicate;
|
||||
pub mod regex;
|
||||
pub mod serialize;
|
||||
|
|
|
@ -6,31 +6,20 @@
|
|||
//!
|
||||
//! [Ballista]: https://github.com/apache/arrow-datafusion/blob/22fcb3d7a68a56afbe12eab9e7d98f7b8de33703/ballista/rust/core/proto/ballista.proto
|
||||
//! [Protocol Buffers 3]: https://developers.google.com/protocol-buffers/docs/proto3
|
||||
use std::{collections::BTreeSet, ops::Deref};
|
||||
use std::{collections::BTreeSet, convert::TryInto};
|
||||
|
||||
use data_types::timestamp::TimestampRange;
|
||||
use datafusion::{
|
||||
logical_plan::{Column, Expr, Operator},
|
||||
scalar::ScalarValue,
|
||||
};
|
||||
use generated_types::influxdata::iox::catalog::v1 as proto;
|
||||
use snafu::{OptionExt, Snafu};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
use crate::predicate::Predicate;
|
||||
use crate::{expr::Expr, predicate::Predicate};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum SerializeError {
|
||||
#[snafu(display("unsupported expression: {:?}", expr))]
|
||||
UnsupportedExpression { expr: Expr },
|
||||
|
||||
#[snafu(display("unsupported operants: left {:?}; right {:?}", left, right))]
|
||||
UnsupportedOperants { left: Expr, right: Expr },
|
||||
|
||||
#[snafu(display("unsupported scalar value: {:?}", value))]
|
||||
UnsupportedScalarValue { value: ScalarValue },
|
||||
|
||||
#[snafu(display("unsupported operator: {:?}", op))]
|
||||
UnsupportedOperator { op: Operator },
|
||||
#[snafu(display("cannot convert datafusion expr: {}", source))]
|
||||
CannotConvertDataFusionExpr {
|
||||
source: crate::expr::DataFusionToExprError,
|
||||
},
|
||||
}
|
||||
|
||||
/// Serialize IOx [`Predicate`] to a protobuf object.
|
||||
|
@ -43,7 +32,13 @@ pub fn serialize(predicate: &Predicate) -> Result<proto::Predicate, SerializeErr
|
|||
exprs: predicate
|
||||
.exprs
|
||||
.iter()
|
||||
.map(serialize_expr)
|
||||
.map(|expr| {
|
||||
let expr: Expr = expr
|
||||
.clone()
|
||||
.try_into()
|
||||
.context(CannotConvertDataFusionExpr)?;
|
||||
Ok(expr.into())
|
||||
})
|
||||
.collect::<Result<Vec<proto::Expr>, SerializeError>>()?,
|
||||
};
|
||||
Ok(proto_predicate)
|
||||
|
@ -69,80 +64,12 @@ fn serialize_timestamp_range(r: &Option<TimestampRange>) -> Option<proto::Timest
|
|||
})
|
||||
}
|
||||
|
||||
fn serialize_expr(expr: &Expr) -> Result<proto::Expr, SerializeError> {
|
||||
match expr {
|
||||
Expr::BinaryExpr { left, op, right } => {
|
||||
let (column, scalar) = match (left.deref(), right.deref()) {
|
||||
// The delete predicate parser currently only supports `<column><op><value>`, not `<value><op><column>`,
|
||||
// however this could can easily be extended to support the latter case as well.
|
||||
(Expr::Column(column), Expr::Literal(value)) => {
|
||||
let column = column.name.clone();
|
||||
let scalar = serialize_scalar_value(value)?;
|
||||
|
||||
(column, scalar)
|
||||
}
|
||||
(other_left, other_right) => {
|
||||
return Err(SerializeError::UnsupportedOperants {
|
||||
left: other_left.clone(),
|
||||
right: other_right.clone(),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
let op = serialize_operator(op)?;
|
||||
|
||||
let proto_expr = proto::Expr {
|
||||
column,
|
||||
op: op.into(),
|
||||
scalar: Some(scalar),
|
||||
};
|
||||
Ok(proto_expr)
|
||||
}
|
||||
other => Err(SerializeError::UnsupportedExpression {
|
||||
expr: other.clone(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize_scalar_value(value: &ScalarValue) -> Result<proto::Scalar, SerializeError> {
|
||||
// see https://github.com/apache/arrow-datafusion/blob/195b69995db8044ce283d72fb78eb6b74b8842f5/datafusion/src/sql/planner.rs#L274-L295
|
||||
match value {
|
||||
ScalarValue::Utf8(Some(value)) => Ok(proto::Scalar {
|
||||
value: Some(proto::scalar::Value::ValueString(value.clone())),
|
||||
}),
|
||||
ScalarValue::Int64(Some(value)) => Ok(proto::Scalar {
|
||||
value: Some(proto::scalar::Value::ValueI64(*value)),
|
||||
}),
|
||||
ScalarValue::Float64(Some(value)) => Ok(proto::Scalar {
|
||||
value: Some(proto::scalar::Value::ValueF64(*value)),
|
||||
}),
|
||||
ScalarValue::Boolean(Some(value)) => Ok(proto::Scalar {
|
||||
value: Some(proto::scalar::Value::ValueBool(*value)),
|
||||
}),
|
||||
other => Err(SerializeError::UnsupportedScalarValue {
|
||||
value: other.clone(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize_operator(op: &Operator) -> Result<proto::Op, SerializeError> {
|
||||
match op {
|
||||
Operator::Eq => Ok(proto::Op::Eq),
|
||||
Operator::NotEq => Ok(proto::Op::Ne),
|
||||
other => Err(SerializeError::UnsupportedOperator { op: *other }),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum DeserializeError {
|
||||
#[snafu(display("unspecified operator"))]
|
||||
UnspecifiedOperator,
|
||||
|
||||
#[snafu(display("illegal operator enum value: {}", value))]
|
||||
IllegalOperatorEnumValue { value: i32 },
|
||||
|
||||
#[snafu(display("missing scalar value"))]
|
||||
MissingScalarValue,
|
||||
#[snafu(display("cannot deserialize expr: {}", source))]
|
||||
CannotDeserializeExpr {
|
||||
source: crate::expr::ProtoToExprError,
|
||||
},
|
||||
}
|
||||
|
||||
/// Deserialize IOx [`Predicate`] from a protobuf object.
|
||||
|
@ -155,8 +82,11 @@ pub fn deserialize(proto_predicate: &proto::Predicate) -> Result<Predicate, Dese
|
|||
exprs: proto_predicate
|
||||
.exprs
|
||||
.iter()
|
||||
.map(|expr| deserialize_expr(expr))
|
||||
.collect::<Result<Vec<Expr>, DeserializeError>>()?,
|
||||
.map(|expr| {
|
||||
let expr: Expr = expr.clone().try_into().context(CannotDeserializeExpr)?;
|
||||
Ok(expr.into())
|
||||
})
|
||||
.collect::<Result<Vec<datafusion::logical_plan::Expr>, DeserializeError>>()?,
|
||||
};
|
||||
Ok(predicate)
|
||||
}
|
||||
|
@ -178,56 +108,8 @@ fn deserialize_timestamp_range(r: &Option<proto::TimestampRange>) -> Option<Time
|
|||
})
|
||||
}
|
||||
|
||||
fn deserialize_expr(proto_expr: &proto::Expr) -> Result<Expr, DeserializeError> {
|
||||
let column = Column {
|
||||
relation: None,
|
||||
name: proto_expr.column.clone(),
|
||||
};
|
||||
let op = deserialize_operator(&proto::Op::from_i32(proto_expr.op).context(
|
||||
IllegalOperatorEnumValue {
|
||||
value: proto_expr.op,
|
||||
},
|
||||
)?)?;
|
||||
let value = deserialize_scalar_value(&proto_expr.scalar)?;
|
||||
|
||||
let expr = Expr::BinaryExpr {
|
||||
left: Box::new(Expr::Column(column)),
|
||||
op,
|
||||
right: Box::new(Expr::Literal(value)),
|
||||
};
|
||||
Ok(expr)
|
||||
}
|
||||
|
||||
fn deserialize_scalar_value(
|
||||
value: &Option<proto::Scalar>,
|
||||
) -> Result<ScalarValue, DeserializeError> {
|
||||
match value
|
||||
.as_ref()
|
||||
.context(MissingScalarValue)?
|
||||
.value
|
||||
.as_ref()
|
||||
.context(MissingScalarValue)?
|
||||
{
|
||||
proto::scalar::Value::ValueBool(value) => Ok(ScalarValue::Boolean(Some(*value))),
|
||||
proto::scalar::Value::ValueI64(value) => Ok(ScalarValue::Int64(Some(*value))),
|
||||
proto::scalar::Value::ValueF64(value) => Ok(ScalarValue::Float64(Some(*value))),
|
||||
proto::scalar::Value::ValueString(value) => Ok(ScalarValue::Utf8(Some(value.clone()))),
|
||||
}
|
||||
}
|
||||
|
||||
fn deserialize_operator(op: &proto::Op) -> Result<Operator, DeserializeError> {
|
||||
match op {
|
||||
proto::Op::Unspecified => Err(DeserializeError::UnspecifiedOperator),
|
||||
proto::Op::Eq => Ok(Operator::Eq),
|
||||
proto::Op::Ne => Ok(Operator::NotEq),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow::datatypes::DataType;
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
use crate::predicate::{ParseDeletePredicate, PredicateBuilder};
|
||||
|
||||
use super::*;
|
||||
|
@ -241,76 +123,6 @@ mod tests {
|
|||
assert_eq!(predicate, recovered);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fail_serialize_unsupported_expression() {
|
||||
let table_name = "my_table";
|
||||
let mut predicate = delete_predicate(table_name);
|
||||
predicate.exprs.push(Expr::Not(Box::new(Expr::BinaryExpr {
|
||||
left: Box::new(Expr::Column(Column {
|
||||
relation: None,
|
||||
name: "foo".to_string(),
|
||||
})),
|
||||
op: Operator::Eq,
|
||||
right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("x".to_string())))),
|
||||
})));
|
||||
let err = serialize(&predicate).unwrap_err();
|
||||
assert_contains!(err.to_string(), "unsupported expression:");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fail_serialize_unsupported_operants() {
|
||||
let table_name = "my_table";
|
||||
let mut predicate = delete_predicate(table_name);
|
||||
predicate.exprs.push(Expr::BinaryExpr {
|
||||
left: Box::new(Expr::Column(Column {
|
||||
relation: None,
|
||||
name: "foo".to_string(),
|
||||
})),
|
||||
op: Operator::Eq,
|
||||
right: Box::new(Expr::Column(Column {
|
||||
relation: None,
|
||||
name: "bar".to_string(),
|
||||
})),
|
||||
});
|
||||
let err = serialize(&predicate).unwrap_err();
|
||||
assert_contains!(err.to_string(), "unsupported operants:");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fail_serialize_unsupported_scalar_value() {
|
||||
let table_name = "my_table";
|
||||
let mut predicate = delete_predicate(table_name);
|
||||
predicate.exprs.push(Expr::BinaryExpr {
|
||||
left: Box::new(Expr::Column(Column {
|
||||
relation: None,
|
||||
name: "foo".to_string(),
|
||||
})),
|
||||
op: Operator::Eq,
|
||||
right: Box::new(Expr::Literal(ScalarValue::List(
|
||||
Some(Box::new(vec![])),
|
||||
Box::new(DataType::Float64),
|
||||
))),
|
||||
});
|
||||
let err = serialize(&predicate).unwrap_err();
|
||||
assert_contains!(err.to_string(), "unsupported scalar value:");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fail_serialize_unsupported_operator() {
|
||||
let table_name = "my_table";
|
||||
let mut predicate = delete_predicate(table_name);
|
||||
predicate.exprs.push(Expr::BinaryExpr {
|
||||
left: Box::new(Expr::Column(Column {
|
||||
relation: None,
|
||||
name: "foo".to_string(),
|
||||
})),
|
||||
op: Operator::Like,
|
||||
right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("x".to_string())))),
|
||||
});
|
||||
let err = serialize(&predicate).unwrap_err();
|
||||
assert_contains!(err.to_string(), "unsupported operator:");
|
||||
}
|
||||
|
||||
fn delete_predicate(table_name: &str) -> Predicate {
|
||||
let start_time = "11";
|
||||
let stop_time = "22";
|
||||
|
|
Loading…
Reference in New Issue