diff --git a/Cargo.lock b/Cargo.lock index 43e083d7d2..4c6a54c913 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1825,16 +1825,11 @@ dependencies = [ name = "influxdb_line_protocol" version = "0.1.0" dependencies = [ - "chrono", - "datafusion 0.1.0", "nom", "observability_deps", - "serde", "smallvec", "snafu", - "sqlparser", "test_helpers", - "thiserror", ] [[package]] @@ -3320,6 +3315,7 @@ dependencies = [ "datafusion_util", "futures", "hashbrown", + "influxdb_line_protocol", "internal_types", "itertools 0.10.1", "libc", @@ -3328,7 +3324,9 @@ dependencies = [ "parking_lot", "regex", "snafu", + "sqlparser", "test_helpers", + "thiserror", "tokio", "tokio-stream", "trace", diff --git a/influxdb_line_protocol/Cargo.toml b/influxdb_line_protocol/Cargo.toml index 3a483e7f2c..202000fa67 100644 --- a/influxdb_line_protocol/Cargo.toml +++ b/influxdb_line_protocol/Cargo.toml @@ -5,15 +5,10 @@ authors = ["Paul Dix "] edition = "2018" [dependencies] # In alphabetical order -chrono = "0.4" -datafusion = { path = "../datafusion" } nom = "6" smallvec = "1.2.0" snafu = "0.6.2" observability_deps = { path = "../observability_deps" } -serde = { version = "1.0", features = ["derive"] } -sqlparser = "0.10.0" -thiserror = "1.0.28" [dev-dependencies] # In alphabetical order -test_helpers = { path = "../test_helpers" } +test_helpers = { path = "../test_helpers" } \ No newline at end of file diff --git a/influxdb_line_protocol/src/delete_parser.rs b/influxdb_line_protocol/src/delete_parser.rs deleted file mode 100644 index abf676f57e..0000000000 --- a/influxdb_line_protocol/src/delete_parser.rs +++ /dev/null @@ -1,461 +0,0 @@ -//! parse string that should be done at client before sending to server - -#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls)] -#![warn( - missing_copy_implementations, - missing_debug_implementations, - clippy::explicit_iter_loop, - clippy::use_self, - clippy::clone_on_ref_ptr -)] - -use datafusion::logical_plan::{lit, Column, Expr, Operator}; - -use sqlparser::{ - ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value}, - dialect::GenericDialect, - parser::Parser, -}; - -use thiserror::Error; - -use chrono::DateTime; - -use crate::timestamp; - -/// Parse Error -#[derive(Debug, Error)] -pub enum Error { - /// Invalid time format - #[error("Invalid timestamp: {}", .0)] - InvalidTimestamp(String), - - /// Invalid time range - #[error("Invalid time range: ({}, {})", .0, .1)] - InvalidTimeRange(String, String), - - /// Predicate syntax error - #[error("Invalid predicate syntax: ({})", .0)] - InvalidSyntax(String), - - /// Predicate semantics error - #[error("Invalid predicate semantics: ({})", .0)] - InvalidSemantics(String), - - /// Predicate include non supported expression - #[error("Delete predicate must be conjunctive expressions of binary 'column_name = literal' or 'column_ame != literal': ({})", .0)] - NotSupportPredicate(String), -} - -/// Result type for Parser Cient -pub type Result = std::result::Result; - -/// Parser for Delete predicate and time range -#[derive(Debug, Clone)] -pub struct ParseDeletePredicate { - pub start_time: i64, - pub stop_time: i64, - // conjunctive predicate of binary expressions of = or != - pub predicate: Vec, -} - -impl ParseDeletePredicate { - /// Create a ParseDeletePredicate - pub fn new(start_time: i64, stop_time: i64, predicate: Vec) -> Self { - Self { - start_time, - stop_time, - predicate, - } - } - - /// Parse and convert the delete grpc API into ParseDeletePredicate to send to server - pub fn try_new(table_name: &str, start: &str, stop: &str, predicate: &str) -> Result { - // parse and check time range - let (start_time, stop_time) = Self::parse_time_range(start, stop)?; - - // Parse the predicate - let delete_exprs = Self::parse_predicate(table_name, predicate)?; - - Ok(Self::new(start_time, stop_time, delete_exprs)) - } - - /// Parse the predicate and convert it into datafusion expression - /// A delete predicate is a conjunctive expression of many - /// binary expressions of 'colum = constant' or 'column != constant' - /// - pub fn parse_predicate(table_name: &str, predicate: &str) -> Result> { - if predicate.is_empty() { - return Ok(vec![]); - } - - // Now add this predicate string into a DELETE SQL to user sqlparser to parse it - // "DELETE FROM table_name WHERE predicate" - let mut sql = "DELETE FROM ".to_string(); - sql.push_str(table_name); - sql.push_str(" WHERE "); - sql.push_str(predicate); - - // parse the delete sql - let dialect = GenericDialect {}; - let ast = Parser::parse_sql(&dialect, sql.as_str()); - match ast { - Err(parse_err) => { - let error_str = format!("{}, {}", predicate, parse_err); - Err(Error::InvalidSyntax(error_str)) - } - Ok(mut stmt) => { - if stmt.len() != 1 { - return Err(Error::InvalidSemantics(predicate.to_string())); - } - - let stmt = stmt.pop(); - match stmt { - Some(Statement::Delete { - table_name: _, - selection: Some(expr), - .. - }) => { - // split this expr into smaller binary if any - let mut exprs = vec![]; - let split = Self::split_members(table_name, &expr, &mut exprs); - if !split { - return Err(Error::NotSupportPredicate(predicate.to_string())); - } - Ok(exprs) - } - _ => Err(Error::InvalidSemantics(predicate.to_string())), - } - } - } - } - - /// Recursively split all "AND" expressions into smaller ones - /// Example: "A AND B AND C" => [A, B, C] - /// Return false if not all of them are AND of binary expression of - /// "column_name = literal" or "column_name != literal" - /// - /// The split expressions will be converted into data fusion expressions - pub fn split_members( - table_name: &str, - predicate: &SqlParserExpr, - predicates: &mut Vec, - ) -> bool { - // The below code built to be compatible with - // https://github.com/influxdata/influxdb/blob/master/predicate/parser_test.go - match predicate { - SqlParserExpr::BinaryOp { - left, - op: BinaryOperator::And, - right, - } => { - if !Self::split_members(table_name, left, predicates) { - return false; - } - if !Self::split_members(table_name, right, predicates) { - return false; - } - } - SqlParserExpr::BinaryOp { left, op, right } => { - // Verify Operator - let op = match op { - BinaryOperator::Eq => Operator::Eq, - BinaryOperator::NotEq => Operator::NotEq, - _ => return false, - }; - - // verify if left is identifier (column name) - let column = match &**left { - SqlParserExpr::Identifier(Ident { - value, - quote_style: _, // all quotes are ignored as done in idpe - }) => Expr::Column(Column { - relation: Some(table_name.to_string()), - name: value.to_string(), - }), - _ => return false, // not a column name - }; - - // verify if right is a literal or an identifier (e.g column name) - let value = match &**right { - SqlParserExpr::Identifier(Ident { - value, - quote_style: _, - }) => lit(value.to_string()), - SqlParserExpr::Value(Value::DoubleQuotedString(value)) => { - lit(value.to_string()) - } - SqlParserExpr::Value(Value::SingleQuotedString(value)) => { - lit(value.to_string()) - } - SqlParserExpr::Value(Value::NationalStringLiteral(value)) => { - lit(value.to_string()) - } - SqlParserExpr::Value(Value::HexStringLiteral(value)) => lit(value.to_string()), - SqlParserExpr::Value(Value::Number(v, _)) => match v.parse::() { - Ok(v) => lit(v), - Err(_) => lit(v.parse::().unwrap()), - }, - SqlParserExpr::Value(Value::Boolean(v)) => lit(*v), - _ => return false, // not a literal - }; - - let expr = Expr::BinaryExpr { - left: Box::new(column), - op, - right: Box::new(value), - }; - predicates.push(expr); - } - _ => return false, - } - - true - } - - /// Parse a time and return its time in nanosecond - pub fn parse_time(input: &str) -> Result { - // This input can be in timestamp form that end with Z such as 1970-01-01T00:00:00Z - // See examples here https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/delete/#delete-all-points-within-a-specified-time-frame - let datetime_result = DateTime::parse_from_rfc3339(input); - match datetime_result { - Ok(datetime) => Ok(datetime.timestamp_nanos()), - Err(timestamp_err) => { - // See if it is in nanosecond form - let time_result = timestamp(input); - match time_result { - Ok((_, nano)) => Ok(nano), - Err(nano_err) => { - // wrong format, return both error - let error_str = format!("{}, {}", timestamp_err, nano_err); - Err(Error::InvalidTimestamp(error_str)) - } - } - } - } - } - - /// Parse a time range [start, stop] - pub fn parse_time_range(start: &str, stop: &str) -> Result<(i64, i64)> { - let start_time = Self::parse_time(start)?; - let stop_time = Self::parse_time(stop)?; - if start_time > stop_time { - return Err(Error::InvalidTimeRange(start.to_string(), stop.to_string())); - } - - Ok((start_time, stop_time)) - } -} - -#[cfg(test)] -mod test { - use datafusion::logical_plan::{col, lit}; - - use super::*; - - #[test] - fn test_time_range_valid() { - let start = r#"100"#; - let stop = r#"100"#; - let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); - let expected = (100, 100); - assert_eq!(result, expected); - - let start = r#"100"#; - let stop = r#"200"#; - let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); - let expected = (100, 200); - assert_eq!(result, expected); - - let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0 - let stop = r#"1970-01-01T00:00:00Z"#; - let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); - let expected = (0, 0); - assert_eq!(result, expected); - - // let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0 - // let stop = r#"now()"#; // -- Not working. Need to find a way to test this - // let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); - // let expected = (0, 0); - // assert_eq!(result, expected); - - let start = r#"1970-01-01T00:00:00Z"#; - let stop = r#"100"#; - let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); - let expected = (0, 100); - assert_eq!(result, expected); - - let start = r#"1970-01-01T00:00:00Z"#; - let stop = r#"1970-01-01T00:01:00Z"#; - let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); - let expected = (0, 60000000000); - assert_eq!(result, expected); - } - - #[test] - fn test_time_range_invalid() { - let start = r#"100"#; - let stop = r#"-100"#; - let result = ParseDeletePredicate::parse_time_range(start, stop); - assert!(result.is_err()); - - let start = r#"100"#; - let stop = r#"50"#; - let result = ParseDeletePredicate::parse_time_range(start, stop); - assert!(result.is_err()); - - let start = r#"100"#; - let stop = r#"1970-01-01T00:00:00Z"#; - let result = ParseDeletePredicate::parse_time_range(start, stop); - assert!(result.is_err()); - - let start = r#"1971-09-01T00:00:10Z"#; - let stop = r#"1971-09-01T00:00:05Z"#; - let result = ParseDeletePredicate::parse_time_range(start, stop); - assert!(result.is_err()); - } - - #[test] - fn test_parse_timestamp() { - let input = r#"123"#; - let time = ParseDeletePredicate::parse_time(input).unwrap(); - assert_eq!(time, 123); - - // must parse time - let input = r#"1970-01-01T00:00:00Z"#; - let time = ParseDeletePredicate::parse_time(input).unwrap(); - assert_eq!(time, 0); - - let input = r#"1971-02-01T15:30:21Z"#; - let time = ParseDeletePredicate::parse_time(input).unwrap(); - assert_eq!(time, 34270221000000000); - } - - #[test] - fn test_parse_timestamp_negative() { - let input = r#"-123"#; - let time = ParseDeletePredicate::parse_time(input).unwrap(); - assert_eq!(time, -123); - } - - #[test] - fn test_parse_timestamp_invalid() { - // It turn out this is not invalid but return1 123 - let input = r#"123gdb"#; - let time = ParseDeletePredicate::parse_time(input).unwrap(); - assert_eq!(time, 123); - //assert!(time.is_err()); - - // must parse time - // It turn out this is not invalid but return1 1970 - let input = r#"1970-01-01T00:00:00"#; - let time = ParseDeletePredicate::parse_time(input).unwrap(); - assert_eq!(time, 1970); - //assert!(time.is_err()); - - // It turn out this is not invalid but return1 1971 - let input = r#"1971-02-01:30:21Z"#; - let time = ParseDeletePredicate::parse_time(input).unwrap(); - assert_eq!(time, 1971); - //assert!(time.is_err()); - } - - #[test] - fn test_parse_timestamp_out_of_range() { - let input = r#"99999999999999999999999999999999"#; - let time = ParseDeletePredicate::parse_time(input); - assert!(time.is_err()); - } - - #[test] - fn test_parse_predicate() { - let table = "test"; - - let pred = r#"city= Boston and cost !=100 and state != "MA" AND temp=87.5"#; - let result = ParseDeletePredicate::parse_predicate(table, pred).unwrap(); - - println!("{:#?}", result); - - let mut expected = vec![]; - let e = col("test.city").eq(lit("Boston")); - expected.push(e); - let val: i64 = 100; - let e = col("test.cost").not_eq(lit(val)); - expected.push(e); - let e = col("test.state").not_eq(lit("MA")); - expected.push(e); - let e = col("test.temp").eq(lit(87.5)); - expected.push(e); - - assert_eq!(result, expected) - } - - #[test] - fn test_parse_predicate_invalid() { - let table = "test"; - - let pred = r#"city= Boston Or cost !=100 and state != "MA""#; // OR - let result = ParseDeletePredicate::parse_predicate(table, pred); - assert!(result.is_err()); - - let pred = r#"city= Boston and cost !=100+1 and state != "MA""#; // 100 + 1 - let result = ParseDeletePredicate::parse_predicate(table, pred); - assert!(result.is_err()); - - let pred = r#"cost > 100"#; // > - let result = ParseDeletePredicate::parse_predicate(table, pred); - assert!(result.is_err()); - - let pred = r#"cost <= 100"#; // < - let result = ParseDeletePredicate::parse_predicate(table, pred); - assert!(result.is_err()); - - let pred = r#"cost gt 100"#; // > - let result = ParseDeletePredicate::parse_predicate(table, pred); - assert!(result.is_err()); - - let pred = r#"city = cost = 100"#; // > - let result = ParseDeletePredicate::parse_predicate(table, pred); - assert!(result.is_err()); - } - - #[test] - fn test_full_delete_pred() { - let table = "test"; - let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0 - let stop = r#"200"#; - let pred = r#"cost != 100"#; - - let result = ParseDeletePredicate::try_new(table, start, stop, pred).unwrap(); - assert_eq!(result.start_time, 0); - assert_eq!(result.stop_time, 200); - - let mut expected = vec![]; - let num: i64 = 100; - let e = col("test.cost").not_eq(lit(num)); - expected.push(e); - assert_eq!(result.predicate, expected); - } - - #[test] - fn test_full_delete_pred_invalid_time_range() { - let table = "test"; - let start = r#"100"#; - let stop = r#"50"#; - let pred = r#"cost != 100"#; - - let result = ParseDeletePredicate::try_new(table, start, stop, pred); - assert!(result.is_err()); - } - - #[test] - fn test_full_delete_pred_invalid_pred() { - let table = "test"; - let start = r#"100"#; - let stop = r#"200"#; - let pred = r#"cost > 100"#; - - let result = ParseDeletePredicate::try_new(table, start, stop, pred); - assert!(result.is_err()); - } -} diff --git a/influxdb_line_protocol/src/lib.rs b/influxdb_line_protocol/src/lib.rs index b11f8b3ff8..fff3d9e24f 100644 --- a/influxdb_line_protocol/src/lib.rs +++ b/influxdb_line_protocol/src/lib.rs @@ -16,8 +16,6 @@ clippy::clone_on_ref_ptr )] -pub mod delete_parser; - use fmt::Display; use nom::{ branch::alt, diff --git a/query/Cargo.toml b/query/Cargo.toml index d03fb66b76..f8f65bd5a5 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -24,16 +24,20 @@ datafusion = { path = "../datafusion" } datafusion_util = { path = "../datafusion_util" } futures = "0.3" hashbrown = "0.11" +influxdb_line_protocol = { path = "../influxdb_line_protocol" } internal_types = { path = "../internal_types" } metrics = { path = "../metrics" } +observability_deps = { path = "../observability_deps" } parking_lot = "0.11.2" regex = "1" snafu = "0.6.3" +sqlparser = "0.10.0" +thiserror = "1.0.28" tokio = { version = "1.11", features = ["macros"] } tokio-stream = "0.1.2" trace = { path = "../trace" } tracker = { path = "../tracker" } -observability_deps = { path = "../observability_deps" } + # use libc on unix like platforms to set worker priority in DedicatedExecutor [target."cfg(unix)".dependencies.libc] diff --git a/query/src/predicate.rs b/query/src/predicate.rs index 0ea9d71abe..5c89ffc127 100644 --- a/query/src/predicate.rs +++ b/query/src/predicate.rs @@ -11,12 +11,22 @@ use std::{ use data_types::timestamp::TimestampRange; use datafusion::{ error::DataFusionError, - logical_plan::{col, Expr, Operator}, + logical_plan::{col, lit, Column, Expr, Operator}, optimizer::utils, }; use datafusion_util::{make_range_expr, AndExprBuilder}; +use influxdb_line_protocol::timestamp; use internal_types::schema::TIME_COLUMN_NAME; use observability_deps::tracing::debug; +use sqlparser::{ + ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value}, + dialect::GenericDialect, + parser::Parser, +}; + +use thiserror::Error; + +use chrono::DateTime; /// This `Predicate` represents the empty predicate (aka that /// evaluates to true for all rows). @@ -393,6 +403,231 @@ impl PredicateBuilder { } } +// Parse Delete Predicates +/// Parse Error +#[derive(Debug, Error)] +pub enum Error { + /// Invalid time format + #[error("Invalid timestamp: {}", .0)] + InvalidTimestamp(String), + + /// Invalid time range + #[error("Invalid time range: ({}, {})", .0, .1)] + InvalidTimeRange(String, String), + + /// Predicate syntax error + #[error("Invalid predicate syntax: ({})", .0)] + InvalidSyntax(String), + + /// Predicate semantics error + #[error("Invalid predicate semantics: ({})", .0)] + InvalidSemantics(String), + + /// Predicate include non supported expression + #[error("Delete predicate must be conjunctive expressions of binary 'column_name = literal' or 'column_ame != literal': ({})", .0)] + NotSupportPredicate(String), +} + +/// Result type for Parser Cient +pub type Result = std::result::Result; + +/// Parser for Delete predicate and time range +#[derive(Debug, Clone)] +pub struct ParseDeletePredicate { + pub start_time: i64, + pub stop_time: i64, + // conjunctive predicate of binary expressions of = or != + pub predicate: Vec, +} + +impl ParseDeletePredicate { + /// Create a ParseDeletePredicate + pub fn new(start_time: i64, stop_time: i64, predicate: Vec) -> Self { + Self { + start_time, + stop_time, + predicate, + } + } + + /// Parse and convert the delete grpc API into ParseDeletePredicate to send to server + pub fn try_new(table_name: &str, start: &str, stop: &str, predicate: &str) -> Result { + // parse and check time range + let (start_time, stop_time) = Self::parse_time_range(start, stop)?; + + // Parse the predicate + let delete_exprs = Self::parse_predicate(table_name, predicate)?; + + Ok(Self::new(start_time, stop_time, delete_exprs)) + } + + /// Parse the predicate and convert it into datafusion expression + /// A delete predicate is a conjunctive expression of many + /// binary expressions of 'colum = constant' or 'column != constant' + /// + pub fn parse_predicate(table_name: &str, predicate: &str) -> Result> { + if predicate.is_empty() { + return Ok(vec![]); + } + + // Now add this predicate string into a DELETE SQL to user sqlparser to parse it + // "DELETE FROM table_name WHERE predicate" + let mut sql = "DELETE FROM ".to_string(); + sql.push_str(table_name); + sql.push_str(" WHERE "); + sql.push_str(predicate); + + // parse the delete sql + let dialect = GenericDialect {}; + let ast = Parser::parse_sql(&dialect, sql.as_str()); + match ast { + Err(parse_err) => { + let error_str = format!("{}, {}", predicate, parse_err); + Err(Error::InvalidSyntax(error_str)) + } + Ok(mut stmt) => { + if stmt.len() != 1 { + return Err(Error::InvalidSemantics(predicate.to_string())); + } + + let stmt = stmt.pop(); + match stmt { + Some(Statement::Delete { + table_name: _, + selection: Some(expr), + .. + }) => { + // split this expr into smaller binary if any + let mut exprs = vec![]; + let split = Self::split_members(table_name, &expr, &mut exprs); + if !split { + return Err(Error::NotSupportPredicate(predicate.to_string())); + } + Ok(exprs) + } + _ => Err(Error::InvalidSemantics(predicate.to_string())), + } + } + } + } + + /// Recursively split all "AND" expressions into smaller ones + /// Example: "A AND B AND C" => [A, B, C] + /// Return false if not all of them are AND of binary expression of + /// "column_name = literal" or "column_name != literal" + /// + /// The split expressions will be converted into data fusion expressions + pub fn split_members( + table_name: &str, + predicate: &SqlParserExpr, + predicates: &mut Vec, + ) -> bool { + // The below code built to be compatible with + // https://github.com/influxdata/influxdb/blob/master/predicate/parser_test.go + match predicate { + SqlParserExpr::BinaryOp { + left, + op: BinaryOperator::And, + right, + } => { + if !Self::split_members(table_name, left, predicates) { + return false; + } + if !Self::split_members(table_name, right, predicates) { + return false; + } + } + SqlParserExpr::BinaryOp { left, op, right } => { + // Verify Operator + let op = match op { + BinaryOperator::Eq => Operator::Eq, + BinaryOperator::NotEq => Operator::NotEq, + _ => return false, + }; + + // verify if left is identifier (column name) + let column = match &**left { + SqlParserExpr::Identifier(Ident { + value, + quote_style: _, // all quotes are ignored as done in idpe + }) => Expr::Column(Column { + relation: Some(table_name.to_string()), + name: value.to_string(), + }), + _ => return false, // not a column name + }; + + // verify if right is a literal or an identifier (e.g column name) + let value = match &**right { + SqlParserExpr::Identifier(Ident { + value, + quote_style: _, + }) => lit(value.to_string()), + SqlParserExpr::Value(Value::DoubleQuotedString(value)) => { + lit(value.to_string()) + } + SqlParserExpr::Value(Value::SingleQuotedString(value)) => { + lit(value.to_string()) + } + SqlParserExpr::Value(Value::NationalStringLiteral(value)) => { + lit(value.to_string()) + } + SqlParserExpr::Value(Value::HexStringLiteral(value)) => lit(value.to_string()), + SqlParserExpr::Value(Value::Number(v, _)) => match v.parse::() { + Ok(v) => lit(v), + Err(_) => lit(v.parse::().unwrap()), + }, + SqlParserExpr::Value(Value::Boolean(v)) => lit(*v), + _ => return false, // not a literal + }; + + let expr = Expr::BinaryExpr { + left: Box::new(column), + op, + right: Box::new(value), + }; + predicates.push(expr); + } + _ => return false, + } + + true + } + + /// Parse a time and return its time in nanosecond + pub fn parse_time(input: &str) -> Result { + // This input can be in timestamp form that end with Z such as 1970-01-01T00:00:00Z + // See examples here https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/delete/#delete-all-points-within-a-specified-time-frame + let datetime_result = DateTime::parse_from_rfc3339(input); + match datetime_result { + Ok(datetime) => Ok(datetime.timestamp_nanos()), + Err(timestamp_err) => { + // See if it is in nanosecond form + let time_result = timestamp(input); + match time_result { + Ok((_, nano)) => Ok(nano), + Err(nano_err) => { + // wrong format, return both error + let error_str = format!("{}, {}", timestamp_err, nano_err); + Err(Error::InvalidTimestamp(error_str)) + } + } + } + } + } + + /// Parse a time range [start, stop] + pub fn parse_time_range(start: &str, stop: &str) -> Result<(i64, i64)> { + let start_time = Self::parse_time(start)?; + let stop_time = Self::parse_time(stop)?; + if start_time > stop_time { + return Err(Error::InvalidTimeRange(start.to_string(), stop.to_string())); + } + + Ok((start_time, stop_time)) + } +} + #[cfg(test)] mod tests { use super::*; @@ -535,5 +770,210 @@ mod tests { assert_eq!(p.to_string(), "Predicate table_names: {my_table} field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo Eq Int32(42)]"); } - // Nga todo: move all code and tests in influxdb_line_protocol/src/delete_parser.rs into this file in the next PR + // The delete predicate + #[test] + fn test_time_range_valid() { + let start = r#"100"#; + let stop = r#"100"#; + let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); + let expected = (100, 100); + assert_eq!(result, expected); + + let start = r#"100"#; + let stop = r#"200"#; + let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); + let expected = (100, 200); + assert_eq!(result, expected); + + let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0 + let stop = r#"1970-01-01T00:00:00Z"#; + let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); + let expected = (0, 0); + assert_eq!(result, expected); + + // let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0 + // let stop = r#"now()"#; // -- Not working. Need to find a way to test this + // let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); + // let expected = (0, 0); + // assert_eq!(result, expected); + + let start = r#"1970-01-01T00:00:00Z"#; + let stop = r#"100"#; + let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); + let expected = (0, 100); + assert_eq!(result, expected); + + let start = r#"1970-01-01T00:00:00Z"#; + let stop = r#"1970-01-01T00:01:00Z"#; + let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); + let expected = (0, 60000000000); + assert_eq!(result, expected); + } + + #[test] + fn test_time_range_invalid() { + let start = r#"100"#; + let stop = r#"-100"#; + let result = ParseDeletePredicate::parse_time_range(start, stop); + assert!(result.is_err()); + + let start = r#"100"#; + let stop = r#"50"#; + let result = ParseDeletePredicate::parse_time_range(start, stop); + assert!(result.is_err()); + + let start = r#"100"#; + let stop = r#"1970-01-01T00:00:00Z"#; + let result = ParseDeletePredicate::parse_time_range(start, stop); + assert!(result.is_err()); + + let start = r#"1971-09-01T00:00:10Z"#; + let stop = r#"1971-09-01T00:00:05Z"#; + let result = ParseDeletePredicate::parse_time_range(start, stop); + assert!(result.is_err()); + } + + #[test] + fn test_parse_timestamp() { + let input = r#"123"#; + let time = ParseDeletePredicate::parse_time(input).unwrap(); + assert_eq!(time, 123); + + // must parse time + let input = r#"1970-01-01T00:00:00Z"#; + let time = ParseDeletePredicate::parse_time(input).unwrap(); + assert_eq!(time, 0); + + let input = r#"1971-02-01T15:30:21Z"#; + let time = ParseDeletePredicate::parse_time(input).unwrap(); + assert_eq!(time, 34270221000000000); + } + + #[test] + fn test_parse_timestamp_negative() { + let input = r#"-123"#; + let time = ParseDeletePredicate::parse_time(input).unwrap(); + assert_eq!(time, -123); + } + + #[test] + fn test_parse_timestamp_invalid() { + // It turn out this is not invalid but return1 123 + let input = r#"123gdb"#; + let time = ParseDeletePredicate::parse_time(input).unwrap(); + assert_eq!(time, 123); + //assert!(time.is_err()); + + // must parse time + // It turn out this is not invalid but return1 1970 + let input = r#"1970-01-01T00:00:00"#; + let time = ParseDeletePredicate::parse_time(input).unwrap(); + assert_eq!(time, 1970); + //assert!(time.is_err()); + + // It turn out this is not invalid but return1 1971 + let input = r#"1971-02-01:30:21Z"#; + let time = ParseDeletePredicate::parse_time(input).unwrap(); + assert_eq!(time, 1971); + //assert!(time.is_err()); + } + + #[test] + fn test_parse_timestamp_out_of_range() { + let input = r#"99999999999999999999999999999999"#; + let time = ParseDeletePredicate::parse_time(input); + assert!(time.is_err()); + } + + #[test] + fn test_parse_predicate() { + let table = "test"; + + let pred = r#"city= Boston and cost !=100 and state != "MA" AND temp=87.5"#; + let result = ParseDeletePredicate::parse_predicate(table, pred).unwrap(); + + println!("{:#?}", result); + + let mut expected = vec![]; + let e = col("test.city").eq(lit("Boston")); + expected.push(e); + let val: i64 = 100; + let e = col("test.cost").not_eq(lit(val)); + expected.push(e); + let e = col("test.state").not_eq(lit("MA")); + expected.push(e); + let e = col("test.temp").eq(lit(87.5)); + expected.push(e); + + assert_eq!(result, expected) + } + + #[test] + fn test_parse_predicate_invalid() { + let table = "test"; + + let pred = r#"city= Boston Or cost !=100 and state != "MA""#; // OR + let result = ParseDeletePredicate::parse_predicate(table, pred); + assert!(result.is_err()); + + let pred = r#"city= Boston and cost !=100+1 and state != "MA""#; // 100 + 1 + let result = ParseDeletePredicate::parse_predicate(table, pred); + assert!(result.is_err()); + + let pred = r#"cost > 100"#; // > + let result = ParseDeletePredicate::parse_predicate(table, pred); + assert!(result.is_err()); + + let pred = r#"cost <= 100"#; // < + let result = ParseDeletePredicate::parse_predicate(table, pred); + assert!(result.is_err()); + + let pred = r#"cost gt 100"#; // > + let result = ParseDeletePredicate::parse_predicate(table, pred); + assert!(result.is_err()); + + let pred = r#"city = cost = 100"#; // > + let result = ParseDeletePredicate::parse_predicate(table, pred); + assert!(result.is_err()); + } + + #[test] + fn test_full_delete_pred() { + let table = "test"; + let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0 + let stop = r#"200"#; + let pred = r#"cost != 100"#; + + let result = ParseDeletePredicate::try_new(table, start, stop, pred).unwrap(); + assert_eq!(result.start_time, 0); + assert_eq!(result.stop_time, 200); + + let mut expected = vec![]; + let num: i64 = 100; + let e = col("test.cost").not_eq(lit(num)); + expected.push(e); + assert_eq!(result.predicate, expected); + } + + #[test] + fn test_full_delete_pred_invalid_time_range() { + let table = "test"; + let start = r#"100"#; + let stop = r#"50"#; + let pred = r#"cost != 100"#; + + let result = ParseDeletePredicate::try_new(table, start, stop, pred); + assert!(result.is_err()); + } + + #[test] + fn test_full_delete_pred_invalid_pred() { + let table = "test"; + let start = r#"100"#; + let stop = r#"200"#; + let pred = r#"cost > 100"#; + + let result = ParseDeletePredicate::try_new(table, start, stop, pred); + assert!(result.is_err()); + } } diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index b299c4f8c9..cf07155183 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -6,8 +6,7 @@ use std::time::Instant; use data_types::{server_id::ServerId, DatabaseName}; use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound}; use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *}; -use influxdb_line_protocol::delete_parser::ParseDeletePredicate; -use query::predicate::PredicateBuilder; +use query::predicate::{ParseDeletePredicate, PredicateBuilder}; use query::QueryDatabase; use server::rules::ProvidedDatabaseRules; use server::{ApplicationState, ConnectionManager, Error, Server};