Merge pull request #2537 from influxdata/crepererum/issue2493c
feat: `Predicate` serializationpull/24376/head
@ -3150,11 +3150,13 @@ dependencies = [
"datafusion 0.1.0",
"datafusion 0.1.0",
@ -39,6 +39,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
@ -0,0 +1,80 @@
syntax = "proto3";
package influxdata.iox.catalog.v1;
// Represents a parsed predicate for evaluation by the InfluxDB IOx query engine.
message Predicate {
// Optional table restriction. If present, restricts the results to only tables these tables.
OptionalStringSet table_names = 1;
// Optional field restriction. If present, restricts the results to only tables which have *at least one* of the
// fields in field_columns.
OptionalStringSet field_columns = 2;
// Optional partition key filter
OptionalString partition_key = 3;
// Optional timestamp range: only rows within this range are included in results. Other rows are excluded.
TimestampRange range = 4;
// Optional arbitrary predicates, represented as list of expressions applied a logical conjunction (aka they are
// 'AND'ed together). Only rows that evaluate to TRUE for all these expressions should be returned. Other rows are
// excluded from the results.
repeated Expr exprs = 5;
// A optional string set.
// This is used instead of a `repeated string` to differenctiate between "empty set" and "none".
message OptionalStringSet {
repeated string values = 1;
// An optional string.
message OptionalString {
string value = 1;
// Specifies a continuous range of nanosecond timestamps.
message TimestampRange {
// Start defines the inclusive lower bound.
int64 start = 1;
// End defines the exclusive upper bound.
int64 end = 2;
// Single expression to be used as parts of a predicate.
// Only very simple expression of the type `<columng> <op> <scalar>` are supported.
message Expr {
// Column (w/o table name).
string column = 1;
// Operator.
Op op = 2;
// Scalar value.
Scalar scalar = 3;
// Binary operator that can be evaluated on a column and a scalar value.
enum Op {
// Unspecified operator, will result in an error.
// Strict equality (`=`).
OP_EQ = 1;
// Inequality (`!=`).
OP_NE = 2;
// Scalar value of a certain type.
message Scalar {
oneof value {
bool value_bool = 1;
int64 value_i64 = 2;
double value_f64 = 3;
string value_string = 4;
@ -9,6 +9,7 @@ chrono = "0.4"
data_types = { path = "../data_types" }
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
datafusion_util = { path = "../datafusion_util" }
generated_types = { path = "../generated_types" }
internal_types = { path = "../internal_types" }
internal_types = { path = "../internal_types" }
observability_deps = { path = "../observability_deps" }
observability_deps = { path = "../observability_deps" }
regex = "1"
regex = "1"
@ -16,4 +17,5 @@ snafu = "0.6.9"
sqlparser = "0.10.0"
sqlparser = "0.10.0"
test_helpers = { path = "../test_helpers" }
tokio = { version = "1.11", features = ["macros"] }
tokio = { version = "1.11", features = ["macros"] }
@ -1,2 +1,3 @@
pub mod predicate;
pub mod predicate;
pub mod regex;
pub mod regex;
pub mod serialize;
@ -0,0 +1,335 @@
//! Code to serialize and deserialize certain expressions.
//! Note that [Ballista] also provides a serialization using [Protocol Buffers 3]. However the protocol is meant as a
//! communication channel between workers and clients of Ballista, not for long term preservation. For IOx we need a
//! more stable solution. Luckily we only need to support a very small subset of expression.
//! [Ballista]:
//! [Protocol Buffers 3]:
use std::{collections::BTreeSet, ops::Deref};
use data_types::timestamp::TimestampRange;
use datafusion::{
logical_plan::{Column, Expr, Operator},
use generated_types::influxdata::iox::catalog::v1 as proto;
use snafu::{OptionExt, Snafu};
use crate::predicate::Predicate;
#[derive(Debug, Snafu)]
pub enum SerializeError {
#[snafu(display("unsupported expression: {:?}", expr))]
UnsupportedExpression { expr: Expr },
#[snafu(display("unsupported operants: left {:?}; right {:?}", left, right))]
UnsupportedOperants { left: Expr, right: Expr },
#[snafu(display("unsupported scalar value: {:?}", value))]
UnsupportedScalarValue { value: ScalarValue },
#[snafu(display("unsupported operator: {:?}", op))]
UnsupportedOperator { op: Operator },
/// Serialize IOx [`Predicate`] to a protobuf object.
pub fn serialize(predicate: &Predicate) -> Result<proto::Predicate, SerializeError> {
let proto_predicate = proto::Predicate {
table_names: serialize_optional_string_set(&predicate.table_names),
field_columns: serialize_optional_string_set(&predicate.field_columns),
partition_key: serialize_optional_string(&predicate.partition_key),
range: serialize_timestamp_range(&predicate.range),
exprs: predicate
.collect::<Result<Vec<proto::Expr>, SerializeError>>()?,
fn serialize_optional_string_set(
set: &Option<BTreeSet<String>>,
) -> Option<proto::OptionalStringSet> {
set.as_ref().map(|set| proto::OptionalStringSet {
values: set.iter().cloned().collect(),
fn serialize_optional_string(s: &Option<String>) -> Option<proto::OptionalString> {
.map(|s| proto::OptionalString { value: s.clone() })
fn serialize_timestamp_range(r: &Option<TimestampRange>) -> Option<proto::TimestampRange> {
r.as_ref().map(|r| proto::TimestampRange {
start: r.start,
end: r.end,
fn serialize_expr(expr: &Expr) -> Result<proto::Expr, SerializeError> {
match expr {
Expr::BinaryExpr { left, op, right } => {
let (column, scalar) = match (left.deref(), right.deref()) {
// The delete predicate parser currently only supports `<column><op><value>`, not `<value><op><column>`,
// however this could can easily be extended to support the latter case as well.
(Expr::Column(column), Expr::Literal(value)) => {
let column =;
let scalar = serialize_scalar_value(value)?;
(column, scalar)
(other_left, other_right) => {
return Err(SerializeError::UnsupportedOperants {
left: other_left.clone(),
right: other_right.clone(),
let op = serialize_operator(op)?;
let proto_expr = proto::Expr {
op: op.into(),
scalar: Some(scalar),
other => Err(SerializeError::UnsupportedExpression {
expr: other.clone(),
fn serialize_scalar_value(value: &ScalarValue) -> Result<proto::Scalar, SerializeError> {
// see
match value {
ScalarValue::Utf8(Some(value)) => Ok(proto::Scalar {
value: Some(proto::scalar::Value::ValueString(value.clone())),
ScalarValue::Int64(Some(value)) => Ok(proto::Scalar {
value: Some(proto::scalar::Value::ValueI64(*value)),
ScalarValue::Float64(Some(value)) => Ok(proto::Scalar {
value: Some(proto::scalar::Value::ValueF64(*value)),
ScalarValue::Boolean(Some(value)) => Ok(proto::Scalar {
value: Some(proto::scalar::Value::ValueBool(*value)),
other => Err(SerializeError::UnsupportedScalarValue {
value: other.clone(),
fn serialize_operator(op: &Operator) -> Result<proto::Op, SerializeError> {
match op {
Operator::Eq => Ok(proto::Op::Eq),
Operator::NotEq => Ok(proto::Op::Ne),
other => Err(SerializeError::UnsupportedOperator { op: *other }),
#[derive(Debug, Snafu)]
pub enum DeserializeError {
#[snafu(display("unspecified operator"))]
#[snafu(display("illegal operator enum value: {}", value))]
IllegalOperatorEnumValue { value: i32 },
#[snafu(display("missing scalar value"))]
/// Deserialize IOx [`Predicate`] from a protobuf object.
pub fn deserialize(
proto_predicate: &proto::Predicate,
table_name: &str,
) -> Result<Predicate, DeserializeError> {
let predicate = Predicate {
table_names: deserialize_optional_string_set(&proto_predicate.table_names),
field_columns: deserialize_optional_string_set(&proto_predicate.field_columns),
partition_key: deserialize_optional_string(&proto_predicate.partition_key),
range: deserialize_timestamp_range(&proto_predicate.range),
exprs: proto_predicate
.map(|expr| deserialize_expr(expr, table_name))
.collect::<Result<Vec<Expr>, DeserializeError>>()?,
fn deserialize_optional_string_set(
set: &Option<proto::OptionalStringSet>,
) -> Option<BTreeSet<String>> {
set.as_ref().map(|set| set.values.iter().cloned().collect())
fn deserialize_optional_string(s: &Option<proto::OptionalString>) -> Option<String> {
s.as_ref().map(|s| s.value.clone())
fn deserialize_timestamp_range(r: &Option<proto::TimestampRange>) -> Option<TimestampRange> {
r.as_ref().map(|r| TimestampRange {
start: r.start,
end: r.end,
fn deserialize_expr(proto_expr: &proto::Expr, table_name: &str) -> Result<Expr, DeserializeError> {
let column = Column {
relation: Some(table_name.to_string()),
name: proto_expr.column.clone(),
let op = deserialize_operator(&proto::Op::from_i32(proto_expr.op).context(
IllegalOperatorEnumValue {
value: proto_expr.op,
let value = deserialize_scalar_value(&proto_expr.scalar)?;
let expr = Expr::BinaryExpr {
left: Box::new(Expr::Column(column)),
right: Box::new(Expr::Literal(value)),
fn deserialize_scalar_value(
value: &Option<proto::Scalar>,
) -> Result<ScalarValue, DeserializeError> {
match value
proto::scalar::Value::ValueBool(value) => Ok(ScalarValue::Boolean(Some(*value))),
proto::scalar::Value::ValueI64(value) => Ok(ScalarValue::Int64(Some(*value))),
proto::scalar::Value::ValueF64(value) => Ok(ScalarValue::Float64(Some(*value))),
proto::scalar::Value::ValueString(value) => Ok(ScalarValue::Utf8(Some(value.clone()))),
fn deserialize_operator(op: &proto::Op) -> Result<Operator, DeserializeError> {
match op {
proto::Op::Unspecified => Err(DeserializeError::UnspecifiedOperator),
proto::Op::Eq => Ok(Operator::Eq),
proto::Op::Ne => Ok(Operator::NotEq),
mod tests {
use arrow::datatypes::DataType;
use test_helpers::assert_contains;
use crate::predicate::{ParseDeletePredicate, PredicateBuilder};
use super::*;
fn test_roundtrip() {
let table_name = "my_table";
let predicate = delete_predicate(table_name);
let proto = serialize(&predicate).unwrap();
let recovered = deserialize(&proto, table_name).unwrap();
assert_eq!(predicate, recovered);
fn test_fail_serialize_unsupported_expression() {
let table_name = "my_table";
let mut predicate = delete_predicate(table_name);
predicate.exprs.push(Expr::Not(Box::new(Expr::BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "foo".to_string(),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("x".to_string())))),
let err = serialize(&predicate).unwrap_err();
assert_contains!(err.to_string(), "unsupported expression:");
fn test_fail_serialize_unsupported_operants() {
let table_name = "my_table";
let mut predicate = delete_predicate(table_name);
predicate.exprs.push(Expr::BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "foo".to_string(),
op: Operator::Eq,
right: Box::new(Expr::Column(Column {
relation: None,
name: "bar".to_string(),
let err = serialize(&predicate).unwrap_err();
assert_contains!(err.to_string(), "unsupported operants:");
fn test_fail_serialize_unsupported_scalar_value() {
let table_name = "my_table";
let mut predicate = delete_predicate(table_name);
predicate.exprs.push(Expr::BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "foo".to_string(),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::List(
let err = serialize(&predicate).unwrap_err();
assert_contains!(err.to_string(), "unsupported scalar value:");
fn test_fail_serialize_unsupported_operator() {
let table_name = "my_table";
let mut predicate = delete_predicate(table_name);
predicate.exprs.push(Expr::BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "foo".to_string(),
op: Operator::Like,
right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("x".to_string())))),
let err = serialize(&predicate).unwrap_err();
assert_contains!(err.to_string(), "unsupported operator:");
fn delete_predicate(table_name: &str) -> Predicate {
let start_time = "11";
let stop_time = "22";
let predicate = r#"city=Boston and cost!=100 and temp=87.5 and good=true"#;
let parse_delete_pred =
ParseDeletePredicate::try_new(table_name, start_time, stop_time, predicate).unwrap();
let mut del_predicate_builder = PredicateBuilder::new()
.timestamp_range(parse_delete_pred.start_time, parse_delete_pred.stop_time);
for expr in parse_delete_pred.predicate {
del_predicate_builder = del_predicate_builder.add_expr(expr);
Reference in New Issue