refactor: move delete parseing work out of influxdb_line_protocol crate
parent
33a2c061cd
commit
1effb11ad9
|
@ -1825,16 +1825,11 @@ dependencies = [
|
|||
name = "influxdb_line_protocol"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"datafusion 0.1.0",
|
||||
"nom",
|
||||
"observability_deps",
|
||||
"serde",
|
||||
"smallvec",
|
||||
"snafu",
|
||||
"sqlparser",
|
||||
"test_helpers",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -3320,6 +3315,7 @@ dependencies = [
|
|||
"datafusion_util",
|
||||
"futures",
|
||||
"hashbrown",
|
||||
"influxdb_line_protocol",
|
||||
"internal_types",
|
||||
"itertools 0.10.1",
|
||||
"libc",
|
||||
|
@ -3328,7 +3324,9 @@ dependencies = [
|
|||
"parking_lot",
|
||||
"regex",
|
||||
"snafu",
|
||||
"sqlparser",
|
||||
"test_helpers",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"trace",
|
||||
|
|
|
@ -5,15 +5,10 @@ authors = ["Paul Dix <paul@pauldix.net>"]
|
|||
edition = "2018"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
chrono = "0.4"
|
||||
datafusion = { path = "../datafusion" }
|
||||
nom = "6"
|
||||
smallvec = "1.2.0"
|
||||
snafu = "0.6.2"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
sqlparser = "0.10.0"
|
||||
thiserror = "1.0.28"
|
||||
|
||||
[dev-dependencies] # In alphabetical order
|
||||
test_helpers = { path = "../test_helpers" }
|
||||
test_helpers = { path = "../test_helpers" }
|
|
@ -1,461 +0,0 @@
|
|||
//! parse string that should be done at client before sending to server
|
||||
|
||||
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls)]
|
||||
#![warn(
|
||||
missing_copy_implementations,
|
||||
missing_debug_implementations,
|
||||
clippy::explicit_iter_loop,
|
||||
clippy::use_self,
|
||||
clippy::clone_on_ref_ptr
|
||||
)]
|
||||
|
||||
use datafusion::logical_plan::{lit, Column, Expr, Operator};
|
||||
|
||||
use sqlparser::{
|
||||
ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value},
|
||||
dialect::GenericDialect,
|
||||
parser::Parser,
|
||||
};
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
use chrono::DateTime;
|
||||
|
||||
use crate::timestamp;
|
||||
|
||||
/// Parse Error
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
/// Invalid time format
|
||||
#[error("Invalid timestamp: {}", .0)]
|
||||
InvalidTimestamp(String),
|
||||
|
||||
/// Invalid time range
|
||||
#[error("Invalid time range: ({}, {})", .0, .1)]
|
||||
InvalidTimeRange(String, String),
|
||||
|
||||
/// Predicate syntax error
|
||||
#[error("Invalid predicate syntax: ({})", .0)]
|
||||
InvalidSyntax(String),
|
||||
|
||||
/// Predicate semantics error
|
||||
#[error("Invalid predicate semantics: ({})", .0)]
|
||||
InvalidSemantics(String),
|
||||
|
||||
/// Predicate include non supported expression
|
||||
#[error("Delete predicate must be conjunctive expressions of binary 'column_name = literal' or 'column_ame != literal': ({})", .0)]
|
||||
NotSupportPredicate(String),
|
||||
}
|
||||
|
||||
/// Result type for Parser Cient
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// Parser for Delete predicate and time range
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ParseDeletePredicate {
|
||||
pub start_time: i64,
|
||||
pub stop_time: i64,
|
||||
// conjunctive predicate of binary expressions of = or !=
|
||||
pub predicate: Vec<Expr>,
|
||||
}
|
||||
|
||||
impl ParseDeletePredicate {
|
||||
/// Create a ParseDeletePredicate
|
||||
pub fn new(start_time: i64, stop_time: i64, predicate: Vec<Expr>) -> Self {
|
||||
Self {
|
||||
start_time,
|
||||
stop_time,
|
||||
predicate,
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse and convert the delete grpc API into ParseDeletePredicate to send to server
|
||||
pub fn try_new(table_name: &str, start: &str, stop: &str, predicate: &str) -> Result<Self> {
|
||||
// parse and check time range
|
||||
let (start_time, stop_time) = Self::parse_time_range(start, stop)?;
|
||||
|
||||
// Parse the predicate
|
||||
let delete_exprs = Self::parse_predicate(table_name, predicate)?;
|
||||
|
||||
Ok(Self::new(start_time, stop_time, delete_exprs))
|
||||
}
|
||||
|
||||
/// Parse the predicate and convert it into datafusion expression
|
||||
/// A delete predicate is a conjunctive expression of many
|
||||
/// binary expressions of 'colum = constant' or 'column != constant'
|
||||
///
|
||||
pub fn parse_predicate(table_name: &str, predicate: &str) -> Result<Vec<Expr>> {
|
||||
if predicate.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
// Now add this predicate string into a DELETE SQL to user sqlparser to parse it
|
||||
// "DELETE FROM table_name WHERE predicate"
|
||||
let mut sql = "DELETE FROM ".to_string();
|
||||
sql.push_str(table_name);
|
||||
sql.push_str(" WHERE ");
|
||||
sql.push_str(predicate);
|
||||
|
||||
// parse the delete sql
|
||||
let dialect = GenericDialect {};
|
||||
let ast = Parser::parse_sql(&dialect, sql.as_str());
|
||||
match ast {
|
||||
Err(parse_err) => {
|
||||
let error_str = format!("{}, {}", predicate, parse_err);
|
||||
Err(Error::InvalidSyntax(error_str))
|
||||
}
|
||||
Ok(mut stmt) => {
|
||||
if stmt.len() != 1 {
|
||||
return Err(Error::InvalidSemantics(predicate.to_string()));
|
||||
}
|
||||
|
||||
let stmt = stmt.pop();
|
||||
match stmt {
|
||||
Some(Statement::Delete {
|
||||
table_name: _,
|
||||
selection: Some(expr),
|
||||
..
|
||||
}) => {
|
||||
// split this expr into smaller binary if any
|
||||
let mut exprs = vec![];
|
||||
let split = Self::split_members(table_name, &expr, &mut exprs);
|
||||
if !split {
|
||||
return Err(Error::NotSupportPredicate(predicate.to_string()));
|
||||
}
|
||||
Ok(exprs)
|
||||
}
|
||||
_ => Err(Error::InvalidSemantics(predicate.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Recursively split all "AND" expressions into smaller ones
|
||||
/// Example: "A AND B AND C" => [A, B, C]
|
||||
/// Return false if not all of them are AND of binary expression of
|
||||
/// "column_name = literal" or "column_name != literal"
|
||||
///
|
||||
/// The split expressions will be converted into data fusion expressions
|
||||
pub fn split_members(
|
||||
table_name: &str,
|
||||
predicate: &SqlParserExpr,
|
||||
predicates: &mut Vec<Expr>,
|
||||
) -> bool {
|
||||
// The below code built to be compatible with
|
||||
// https://github.com/influxdata/influxdb/blob/master/predicate/parser_test.go
|
||||
match predicate {
|
||||
SqlParserExpr::BinaryOp {
|
||||
left,
|
||||
op: BinaryOperator::And,
|
||||
right,
|
||||
} => {
|
||||
if !Self::split_members(table_name, left, predicates) {
|
||||
return false;
|
||||
}
|
||||
if !Self::split_members(table_name, right, predicates) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
SqlParserExpr::BinaryOp { left, op, right } => {
|
||||
// Verify Operator
|
||||
let op = match op {
|
||||
BinaryOperator::Eq => Operator::Eq,
|
||||
BinaryOperator::NotEq => Operator::NotEq,
|
||||
_ => return false,
|
||||
};
|
||||
|
||||
// verify if left is identifier (column name)
|
||||
let column = match &**left {
|
||||
SqlParserExpr::Identifier(Ident {
|
||||
value,
|
||||
quote_style: _, // all quotes are ignored as done in idpe
|
||||
}) => Expr::Column(Column {
|
||||
relation: Some(table_name.to_string()),
|
||||
name: value.to_string(),
|
||||
}),
|
||||
_ => return false, // not a column name
|
||||
};
|
||||
|
||||
// verify if right is a literal or an identifier (e.g column name)
|
||||
let value = match &**right {
|
||||
SqlParserExpr::Identifier(Ident {
|
||||
value,
|
||||
quote_style: _,
|
||||
}) => lit(value.to_string()),
|
||||
SqlParserExpr::Value(Value::DoubleQuotedString(value)) => {
|
||||
lit(value.to_string())
|
||||
}
|
||||
SqlParserExpr::Value(Value::SingleQuotedString(value)) => {
|
||||
lit(value.to_string())
|
||||
}
|
||||
SqlParserExpr::Value(Value::NationalStringLiteral(value)) => {
|
||||
lit(value.to_string())
|
||||
}
|
||||
SqlParserExpr::Value(Value::HexStringLiteral(value)) => lit(value.to_string()),
|
||||
SqlParserExpr::Value(Value::Number(v, _)) => match v.parse::<i64>() {
|
||||
Ok(v) => lit(v),
|
||||
Err(_) => lit(v.parse::<f64>().unwrap()),
|
||||
},
|
||||
SqlParserExpr::Value(Value::Boolean(v)) => lit(*v),
|
||||
_ => return false, // not a literal
|
||||
};
|
||||
|
||||
let expr = Expr::BinaryExpr {
|
||||
left: Box::new(column),
|
||||
op,
|
||||
right: Box::new(value),
|
||||
};
|
||||
predicates.push(expr);
|
||||
}
|
||||
_ => return false,
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
/// Parse a time and return its time in nanosecond
|
||||
pub fn parse_time(input: &str) -> Result<i64> {
|
||||
// This input can be in timestamp form that end with Z such as 1970-01-01T00:00:00Z
|
||||
// See examples here https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/delete/#delete-all-points-within-a-specified-time-frame
|
||||
let datetime_result = DateTime::parse_from_rfc3339(input);
|
||||
match datetime_result {
|
||||
Ok(datetime) => Ok(datetime.timestamp_nanos()),
|
||||
Err(timestamp_err) => {
|
||||
// See if it is in nanosecond form
|
||||
let time_result = timestamp(input);
|
||||
match time_result {
|
||||
Ok((_, nano)) => Ok(nano),
|
||||
Err(nano_err) => {
|
||||
// wrong format, return both error
|
||||
let error_str = format!("{}, {}", timestamp_err, nano_err);
|
||||
Err(Error::InvalidTimestamp(error_str))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a time range [start, stop]
|
||||
pub fn parse_time_range(start: &str, stop: &str) -> Result<(i64, i64)> {
|
||||
let start_time = Self::parse_time(start)?;
|
||||
let stop_time = Self::parse_time(stop)?;
|
||||
if start_time > stop_time {
|
||||
return Err(Error::InvalidTimeRange(start.to_string(), stop.to_string()));
|
||||
}
|
||||
|
||||
Ok((start_time, stop_time))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_time_range_valid() {
|
||||
let start = r#"100"#;
|
||||
let stop = r#"100"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let expected = (100, 100);
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let start = r#"100"#;
|
||||
let stop = r#"200"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let expected = (100, 200);
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
|
||||
let stop = r#"1970-01-01T00:00:00Z"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let expected = (0, 0);
|
||||
assert_eq!(result, expected);
|
||||
|
||||
// let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
|
||||
// let stop = r#"now()"#; // -- Not working. Need to find a way to test this
|
||||
// let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
// let expected = (0, 0);
|
||||
// assert_eq!(result, expected);
|
||||
|
||||
let start = r#"1970-01-01T00:00:00Z"#;
|
||||
let stop = r#"100"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let expected = (0, 100);
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let start = r#"1970-01-01T00:00:00Z"#;
|
||||
let stop = r#"1970-01-01T00:01:00Z"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let expected = (0, 60000000000);
|
||||
assert_eq!(result, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_time_range_invalid() {
|
||||
let start = r#"100"#;
|
||||
let stop = r#"-100"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
|
||||
let start = r#"100"#;
|
||||
let stop = r#"50"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
|
||||
let start = r#"100"#;
|
||||
let stop = r#"1970-01-01T00:00:00Z"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
|
||||
let start = r#"1971-09-01T00:00:10Z"#;
|
||||
let stop = r#"1971-09-01T00:00:05Z"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_timestamp() {
|
||||
let input = r#"123"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
assert_eq!(time, 123);
|
||||
|
||||
// must parse time
|
||||
let input = r#"1970-01-01T00:00:00Z"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
assert_eq!(time, 0);
|
||||
|
||||
let input = r#"1971-02-01T15:30:21Z"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
assert_eq!(time, 34270221000000000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_timestamp_negative() {
|
||||
let input = r#"-123"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
assert_eq!(time, -123);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_timestamp_invalid() {
|
||||
// It turn out this is not invalid but return1 123
|
||||
let input = r#"123gdb"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
assert_eq!(time, 123);
|
||||
//assert!(time.is_err());
|
||||
|
||||
// must parse time
|
||||
// It turn out this is not invalid but return1 1970
|
||||
let input = r#"1970-01-01T00:00:00"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
assert_eq!(time, 1970);
|
||||
//assert!(time.is_err());
|
||||
|
||||
// It turn out this is not invalid but return1 1971
|
||||
let input = r#"1971-02-01:30:21Z"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
assert_eq!(time, 1971);
|
||||
//assert!(time.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_timestamp_out_of_range() {
|
||||
let input = r#"99999999999999999999999999999999"#;
|
||||
let time = ParseDeletePredicate::parse_time(input);
|
||||
assert!(time.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_predicate() {
|
||||
let table = "test";
|
||||
|
||||
let pred = r#"city= Boston and cost !=100 and state != "MA" AND temp=87.5"#;
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred).unwrap();
|
||||
|
||||
println!("{:#?}", result);
|
||||
|
||||
let mut expected = vec![];
|
||||
let e = col("test.city").eq(lit("Boston"));
|
||||
expected.push(e);
|
||||
let val: i64 = 100;
|
||||
let e = col("test.cost").not_eq(lit(val));
|
||||
expected.push(e);
|
||||
let e = col("test.state").not_eq(lit("MA"));
|
||||
expected.push(e);
|
||||
let e = col("test.temp").eq(lit(87.5));
|
||||
expected.push(e);
|
||||
|
||||
assert_eq!(result, expected)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_predicate_invalid() {
|
||||
let table = "test";
|
||||
|
||||
let pred = r#"city= Boston Or cost !=100 and state != "MA""#; // OR
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"city= Boston and cost !=100+1 and state != "MA""#; // 100 + 1
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"cost > 100"#; // >
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"cost <= 100"#; // <
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"cost gt 100"#; // >
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"city = cost = 100"#; // >
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_full_delete_pred() {
|
||||
let table = "test";
|
||||
let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
|
||||
let stop = r#"200"#;
|
||||
let pred = r#"cost != 100"#;
|
||||
|
||||
let result = ParseDeletePredicate::try_new(table, start, stop, pred).unwrap();
|
||||
assert_eq!(result.start_time, 0);
|
||||
assert_eq!(result.stop_time, 200);
|
||||
|
||||
let mut expected = vec![];
|
||||
let num: i64 = 100;
|
||||
let e = col("test.cost").not_eq(lit(num));
|
||||
expected.push(e);
|
||||
assert_eq!(result.predicate, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_full_delete_pred_invalid_time_range() {
|
||||
let table = "test";
|
||||
let start = r#"100"#;
|
||||
let stop = r#"50"#;
|
||||
let pred = r#"cost != 100"#;
|
||||
|
||||
let result = ParseDeletePredicate::try_new(table, start, stop, pred);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_full_delete_pred_invalid_pred() {
|
||||
let table = "test";
|
||||
let start = r#"100"#;
|
||||
let stop = r#"200"#;
|
||||
let pred = r#"cost > 100"#;
|
||||
|
||||
let result = ParseDeletePredicate::try_new(table, start, stop, pred);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
|
@ -16,8 +16,6 @@
|
|||
clippy::clone_on_ref_ptr
|
||||
)]
|
||||
|
||||
pub mod delete_parser;
|
||||
|
||||
use fmt::Display;
|
||||
use nom::{
|
||||
branch::alt,
|
||||
|
|
|
@ -24,16 +24,20 @@ datafusion = { path = "../datafusion" }
|
|||
datafusion_util = { path = "../datafusion_util" }
|
||||
futures = "0.3"
|
||||
hashbrown = "0.11"
|
||||
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
|
||||
internal_types = { path = "../internal_types" }
|
||||
metrics = { path = "../metrics" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parking_lot = "0.11.2"
|
||||
regex = "1"
|
||||
snafu = "0.6.3"
|
||||
sqlparser = "0.10.0"
|
||||
thiserror = "1.0.28"
|
||||
tokio = { version = "1.11", features = ["macros"] }
|
||||
tokio-stream = "0.1.2"
|
||||
trace = { path = "../trace" }
|
||||
tracker = { path = "../tracker" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
|
||||
|
||||
# use libc on unix like platforms to set worker priority in DedicatedExecutor
|
||||
[target."cfg(unix)".dependencies.libc]
|
||||
|
|
|
@ -11,12 +11,22 @@ use std::{
|
|||
use data_types::timestamp::TimestampRange;
|
||||
use datafusion::{
|
||||
error::DataFusionError,
|
||||
logical_plan::{col, Expr, Operator},
|
||||
logical_plan::{col, lit, Column, Expr, Operator},
|
||||
optimizer::utils,
|
||||
};
|
||||
use datafusion_util::{make_range_expr, AndExprBuilder};
|
||||
use influxdb_line_protocol::timestamp;
|
||||
use internal_types::schema::TIME_COLUMN_NAME;
|
||||
use observability_deps::tracing::debug;
|
||||
use sqlparser::{
|
||||
ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value},
|
||||
dialect::GenericDialect,
|
||||
parser::Parser,
|
||||
};
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
use chrono::DateTime;
|
||||
|
||||
/// This `Predicate` represents the empty predicate (aka that
|
||||
/// evaluates to true for all rows).
|
||||
|
@ -393,6 +403,231 @@ impl PredicateBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
// Parse Delete Predicates
|
||||
/// Parse Error
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
/// Invalid time format
|
||||
#[error("Invalid timestamp: {}", .0)]
|
||||
InvalidTimestamp(String),
|
||||
|
||||
/// Invalid time range
|
||||
#[error("Invalid time range: ({}, {})", .0, .1)]
|
||||
InvalidTimeRange(String, String),
|
||||
|
||||
/// Predicate syntax error
|
||||
#[error("Invalid predicate syntax: ({})", .0)]
|
||||
InvalidSyntax(String),
|
||||
|
||||
/// Predicate semantics error
|
||||
#[error("Invalid predicate semantics: ({})", .0)]
|
||||
InvalidSemantics(String),
|
||||
|
||||
/// Predicate include non supported expression
|
||||
#[error("Delete predicate must be conjunctive expressions of binary 'column_name = literal' or 'column_ame != literal': ({})", .0)]
|
||||
NotSupportPredicate(String),
|
||||
}
|
||||
|
||||
/// Result type for Parser Cient
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// Parser for Delete predicate and time range
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ParseDeletePredicate {
|
||||
pub start_time: i64,
|
||||
pub stop_time: i64,
|
||||
// conjunctive predicate of binary expressions of = or !=
|
||||
pub predicate: Vec<Expr>,
|
||||
}
|
||||
|
||||
impl ParseDeletePredicate {
|
||||
/// Create a ParseDeletePredicate
|
||||
pub fn new(start_time: i64, stop_time: i64, predicate: Vec<Expr>) -> Self {
|
||||
Self {
|
||||
start_time,
|
||||
stop_time,
|
||||
predicate,
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse and convert the delete grpc API into ParseDeletePredicate to send to server
|
||||
pub fn try_new(table_name: &str, start: &str, stop: &str, predicate: &str) -> Result<Self> {
|
||||
// parse and check time range
|
||||
let (start_time, stop_time) = Self::parse_time_range(start, stop)?;
|
||||
|
||||
// Parse the predicate
|
||||
let delete_exprs = Self::parse_predicate(table_name, predicate)?;
|
||||
|
||||
Ok(Self::new(start_time, stop_time, delete_exprs))
|
||||
}
|
||||
|
||||
/// Parse the predicate and convert it into datafusion expression
|
||||
/// A delete predicate is a conjunctive expression of many
|
||||
/// binary expressions of 'colum = constant' or 'column != constant'
|
||||
///
|
||||
pub fn parse_predicate(table_name: &str, predicate: &str) -> Result<Vec<Expr>> {
|
||||
if predicate.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
// Now add this predicate string into a DELETE SQL to user sqlparser to parse it
|
||||
// "DELETE FROM table_name WHERE predicate"
|
||||
let mut sql = "DELETE FROM ".to_string();
|
||||
sql.push_str(table_name);
|
||||
sql.push_str(" WHERE ");
|
||||
sql.push_str(predicate);
|
||||
|
||||
// parse the delete sql
|
||||
let dialect = GenericDialect {};
|
||||
let ast = Parser::parse_sql(&dialect, sql.as_str());
|
||||
match ast {
|
||||
Err(parse_err) => {
|
||||
let error_str = format!("{}, {}", predicate, parse_err);
|
||||
Err(Error::InvalidSyntax(error_str))
|
||||
}
|
||||
Ok(mut stmt) => {
|
||||
if stmt.len() != 1 {
|
||||
return Err(Error::InvalidSemantics(predicate.to_string()));
|
||||
}
|
||||
|
||||
let stmt = stmt.pop();
|
||||
match stmt {
|
||||
Some(Statement::Delete {
|
||||
table_name: _,
|
||||
selection: Some(expr),
|
||||
..
|
||||
}) => {
|
||||
// split this expr into smaller binary if any
|
||||
let mut exprs = vec![];
|
||||
let split = Self::split_members(table_name, &expr, &mut exprs);
|
||||
if !split {
|
||||
return Err(Error::NotSupportPredicate(predicate.to_string()));
|
||||
}
|
||||
Ok(exprs)
|
||||
}
|
||||
_ => Err(Error::InvalidSemantics(predicate.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Recursively split all "AND" expressions into smaller ones
|
||||
/// Example: "A AND B AND C" => [A, B, C]
|
||||
/// Return false if not all of them are AND of binary expression of
|
||||
/// "column_name = literal" or "column_name != literal"
|
||||
///
|
||||
/// The split expressions will be converted into data fusion expressions
|
||||
pub fn split_members(
|
||||
table_name: &str,
|
||||
predicate: &SqlParserExpr,
|
||||
predicates: &mut Vec<Expr>,
|
||||
) -> bool {
|
||||
// The below code built to be compatible with
|
||||
// https://github.com/influxdata/influxdb/blob/master/predicate/parser_test.go
|
||||
match predicate {
|
||||
SqlParserExpr::BinaryOp {
|
||||
left,
|
||||
op: BinaryOperator::And,
|
||||
right,
|
||||
} => {
|
||||
if !Self::split_members(table_name, left, predicates) {
|
||||
return false;
|
||||
}
|
||||
if !Self::split_members(table_name, right, predicates) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
SqlParserExpr::BinaryOp { left, op, right } => {
|
||||
// Verify Operator
|
||||
let op = match op {
|
||||
BinaryOperator::Eq => Operator::Eq,
|
||||
BinaryOperator::NotEq => Operator::NotEq,
|
||||
_ => return false,
|
||||
};
|
||||
|
||||
// verify if left is identifier (column name)
|
||||
let column = match &**left {
|
||||
SqlParserExpr::Identifier(Ident {
|
||||
value,
|
||||
quote_style: _, // all quotes are ignored as done in idpe
|
||||
}) => Expr::Column(Column {
|
||||
relation: Some(table_name.to_string()),
|
||||
name: value.to_string(),
|
||||
}),
|
||||
_ => return false, // not a column name
|
||||
};
|
||||
|
||||
// verify if right is a literal or an identifier (e.g column name)
|
||||
let value = match &**right {
|
||||
SqlParserExpr::Identifier(Ident {
|
||||
value,
|
||||
quote_style: _,
|
||||
}) => lit(value.to_string()),
|
||||
SqlParserExpr::Value(Value::DoubleQuotedString(value)) => {
|
||||
lit(value.to_string())
|
||||
}
|
||||
SqlParserExpr::Value(Value::SingleQuotedString(value)) => {
|
||||
lit(value.to_string())
|
||||
}
|
||||
SqlParserExpr::Value(Value::NationalStringLiteral(value)) => {
|
||||
lit(value.to_string())
|
||||
}
|
||||
SqlParserExpr::Value(Value::HexStringLiteral(value)) => lit(value.to_string()),
|
||||
SqlParserExpr::Value(Value::Number(v, _)) => match v.parse::<i64>() {
|
||||
Ok(v) => lit(v),
|
||||
Err(_) => lit(v.parse::<f64>().unwrap()),
|
||||
},
|
||||
SqlParserExpr::Value(Value::Boolean(v)) => lit(*v),
|
||||
_ => return false, // not a literal
|
||||
};
|
||||
|
||||
let expr = Expr::BinaryExpr {
|
||||
left: Box::new(column),
|
||||
op,
|
||||
right: Box::new(value),
|
||||
};
|
||||
predicates.push(expr);
|
||||
}
|
||||
_ => return false,
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
/// Parse a time and return its time in nanosecond
|
||||
pub fn parse_time(input: &str) -> Result<i64> {
|
||||
// This input can be in timestamp form that end with Z such as 1970-01-01T00:00:00Z
|
||||
// See examples here https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/delete/#delete-all-points-within-a-specified-time-frame
|
||||
let datetime_result = DateTime::parse_from_rfc3339(input);
|
||||
match datetime_result {
|
||||
Ok(datetime) => Ok(datetime.timestamp_nanos()),
|
||||
Err(timestamp_err) => {
|
||||
// See if it is in nanosecond form
|
||||
let time_result = timestamp(input);
|
||||
match time_result {
|
||||
Ok((_, nano)) => Ok(nano),
|
||||
Err(nano_err) => {
|
||||
// wrong format, return both error
|
||||
let error_str = format!("{}, {}", timestamp_err, nano_err);
|
||||
Err(Error::InvalidTimestamp(error_str))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a time range [start, stop]
|
||||
pub fn parse_time_range(start: &str, stop: &str) -> Result<(i64, i64)> {
|
||||
let start_time = Self::parse_time(start)?;
|
||||
let stop_time = Self::parse_time(stop)?;
|
||||
if start_time > stop_time {
|
||||
return Err(Error::InvalidTimeRange(start.to_string(), stop.to_string()));
|
||||
}
|
||||
|
||||
Ok((start_time, stop_time))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -535,5 +770,210 @@ mod tests {
|
|||
assert_eq!(p.to_string(), "Predicate table_names: {my_table} field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo Eq Int32(42)]");
|
||||
}
|
||||
|
||||
// Nga todo: move all code and tests in influxdb_line_protocol/src/delete_parser.rs into this file in the next PR
|
||||
// The delete predicate
|
||||
#[test]
|
||||
fn test_time_range_valid() {
|
||||
let start = r#"100"#;
|
||||
let stop = r#"100"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let expected = (100, 100);
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let start = r#"100"#;
|
||||
let stop = r#"200"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let expected = (100, 200);
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
|
||||
let stop = r#"1970-01-01T00:00:00Z"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let expected = (0, 0);
|
||||
assert_eq!(result, expected);
|
||||
|
||||
// let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
|
||||
// let stop = r#"now()"#; // -- Not working. Need to find a way to test this
|
||||
// let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
// let expected = (0, 0);
|
||||
// assert_eq!(result, expected);
|
||||
|
||||
let start = r#"1970-01-01T00:00:00Z"#;
|
||||
let stop = r#"100"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let expected = (0, 100);
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let start = r#"1970-01-01T00:00:00Z"#;
|
||||
let stop = r#"1970-01-01T00:01:00Z"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let expected = (0, 60000000000);
|
||||
assert_eq!(result, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_time_range_invalid() {
|
||||
let start = r#"100"#;
|
||||
let stop = r#"-100"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
|
||||
let start = r#"100"#;
|
||||
let stop = r#"50"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
|
||||
let start = r#"100"#;
|
||||
let stop = r#"1970-01-01T00:00:00Z"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
|
||||
let start = r#"1971-09-01T00:00:10Z"#;
|
||||
let stop = r#"1971-09-01T00:00:05Z"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_timestamp() {
|
||||
let input = r#"123"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
assert_eq!(time, 123);
|
||||
|
||||
// must parse time
|
||||
let input = r#"1970-01-01T00:00:00Z"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
assert_eq!(time, 0);
|
||||
|
||||
let input = r#"1971-02-01T15:30:21Z"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
assert_eq!(time, 34270221000000000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_timestamp_negative() {
|
||||
let input = r#"-123"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
assert_eq!(time, -123);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_timestamp_invalid() {
|
||||
// It turn out this is not invalid but return1 123
|
||||
let input = r#"123gdb"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
assert_eq!(time, 123);
|
||||
//assert!(time.is_err());
|
||||
|
||||
// must parse time
|
||||
// It turn out this is not invalid but return1 1970
|
||||
let input = r#"1970-01-01T00:00:00"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
assert_eq!(time, 1970);
|
||||
//assert!(time.is_err());
|
||||
|
||||
// It turn out this is not invalid but return1 1971
|
||||
let input = r#"1971-02-01:30:21Z"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
assert_eq!(time, 1971);
|
||||
//assert!(time.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_timestamp_out_of_range() {
|
||||
let input = r#"99999999999999999999999999999999"#;
|
||||
let time = ParseDeletePredicate::parse_time(input);
|
||||
assert!(time.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_predicate() {
|
||||
let table = "test";
|
||||
|
||||
let pred = r#"city= Boston and cost !=100 and state != "MA" AND temp=87.5"#;
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred).unwrap();
|
||||
|
||||
println!("{:#?}", result);
|
||||
|
||||
let mut expected = vec![];
|
||||
let e = col("test.city").eq(lit("Boston"));
|
||||
expected.push(e);
|
||||
let val: i64 = 100;
|
||||
let e = col("test.cost").not_eq(lit(val));
|
||||
expected.push(e);
|
||||
let e = col("test.state").not_eq(lit("MA"));
|
||||
expected.push(e);
|
||||
let e = col("test.temp").eq(lit(87.5));
|
||||
expected.push(e);
|
||||
|
||||
assert_eq!(result, expected)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_predicate_invalid() {
|
||||
let table = "test";
|
||||
|
||||
let pred = r#"city= Boston Or cost !=100 and state != "MA""#; // OR
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"city= Boston and cost !=100+1 and state != "MA""#; // 100 + 1
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"cost > 100"#; // >
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"cost <= 100"#; // <
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"cost gt 100"#; // >
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"city = cost = 100"#; // >
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_full_delete_pred() {
|
||||
let table = "test";
|
||||
let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
|
||||
let stop = r#"200"#;
|
||||
let pred = r#"cost != 100"#;
|
||||
|
||||
let result = ParseDeletePredicate::try_new(table, start, stop, pred).unwrap();
|
||||
assert_eq!(result.start_time, 0);
|
||||
assert_eq!(result.stop_time, 200);
|
||||
|
||||
let mut expected = vec![];
|
||||
let num: i64 = 100;
|
||||
let e = col("test.cost").not_eq(lit(num));
|
||||
expected.push(e);
|
||||
assert_eq!(result.predicate, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_full_delete_pred_invalid_time_range() {
|
||||
let table = "test";
|
||||
let start = r#"100"#;
|
||||
let stop = r#"50"#;
|
||||
let pred = r#"cost != 100"#;
|
||||
|
||||
let result = ParseDeletePredicate::try_new(table, start, stop, pred);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_full_delete_pred_invalid_pred() {
|
||||
let table = "test";
|
||||
let start = r#"100"#;
|
||||
let stop = r#"200"#;
|
||||
let pred = r#"cost > 100"#;
|
||||
|
||||
let result = ParseDeletePredicate::try_new(table, start, stop, pred);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,8 +6,7 @@ use std::time::Instant;
|
|||
use data_types::{server_id::ServerId, DatabaseName};
|
||||
use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound};
|
||||
use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *};
|
||||
use influxdb_line_protocol::delete_parser::ParseDeletePredicate;
|
||||
use query::predicate::PredicateBuilder;
|
||||
use query::predicate::{ParseDeletePredicate, PredicateBuilder};
|
||||
use query::QueryDatabase;
|
||||
use server::rules::ProvidedDatabaseRules;
|
||||
use server::{ApplicationState, ConnectionManager, Error, Server};
|
||||
|
|
Loading…
Reference in New Issue