From efcdc8e50ff0592566fa16d9432c51988acc8aa6 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Wed, 1 Sep 2021 17:31:49 -0400 Subject: [PATCH] feat: implement delete parser --- influxdb_iox_client/src/client/management.rs | 4 +- influxdb_line_protocol/src/lib.rs | 61 +++++++++++++++++++- src/influxdb_ioxd/rpc/management.rs | 26 +++++++-- tests/end_to_end_cases/management_api.rs | 2 +- 4 files changed, 84 insertions(+), 9 deletions(-) diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index 9034319de1..f9ceba8e43 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -955,8 +955,8 @@ impl Client { let start_time = start_time.into(); let stop_time = stop_time.into(); - // NGA todo: Should parse and validate start_time, stop_time, and delete_predicate here - // at in client or send them to the server and do the parsing and validation there? + // NGA question: Currently, the "parse and validate start_time, stop_time, and delete_predicate" is + // done in the server's delete function. Should do it here at the client side instead? self.inner .delete(DeleteRequest { db_name, diff --git a/influxdb_line_protocol/src/lib.rs b/influxdb_line_protocol/src/lib.rs index 1f607448f5..cd79eaaf6f 100644 --- a/influxdb_line_protocol/src/lib.rs +++ b/influxdb_line_protocol/src/lib.rs @@ -24,6 +24,7 @@ use nom::{ combinator::{map, opt, recognize}, multi::many0, sequence::{preceded, separated_pair, terminated, tuple}, + Parser, }; use observability_deps::tracing::debug; use smallvec::SmallVec; @@ -504,7 +505,7 @@ pub fn parse_lines(input: &str) -> impl Iterator>> }) } -/// Split `input` into invidividual lines to be parsed, based on the +/// Split `input` into individual lines to be parsed, based on the /// rules of the Line Protocol format. /// /// This code is more or less a direct port of the [Go implementation of @@ -717,6 +718,64 @@ pub fn timestamp(i: &str) -> IResult<&str, i64> { })(i) } +/// Parse delete predicate which is in form of conjunctive expressions with +/// columns being compared to literals using = or != operators. +/// Example: city = "Boston" AND building != 12 and month = 10 +/// This function is currently used to support Delete's gPRC API only. +/// In the future, when we support all kinds of delete predicates supported in +/// Datafusion, we may want to reuse the parser used by Datafusion to keep +/// things consistent +pub fn parse_delete_predicate(input: &str) -> Vec { + // Convert everything after "and" into an expression string + let expr_strings = delete_predicate_to_expr_string(input); + let mut exprs: Vec = vec![]; + for s in expr_strings { + let expr = parse_delete_expression(s); + exprs.push(expr); + } + exprs +} + +#[derive(Debug, Eq, PartialEq, Clone)] +pub struct BinaryExpr { + pub column_name: String, + pub op: String, // either "=" or "!=" + pub literal: String, // constant +} + +/// parse one binary expression column_name = literal +fn parse_delete_expression(input: &str) -> BinaryExpr { + // See if this is an equality comparison + let expr = separated_pair(field_string_value, tag("="), field_value).parse(input); + if expr.is_err() { + // See if this is an inequality comparison + let expr = separated_pair(field_string_value, tag("!="), field_value).parse(input); + if let Err(e) = expr { + panic!("Invalid Delete Predicate: {}, {}", input, e); + } + + let expr = expr.unwrap(); + return BinaryExpr { + column_name: expr.1 .0.to_string(), + op: "!=".to_string(), + literal: expr.1 .1.to_string(), + }; + } + + let expr = expr.unwrap(); + BinaryExpr { + column_name: expr.1 .0.to_string(), + op: "!=".to_string(), + literal: expr.1 .1.to_string(), + } +} + +/// Split string after "and" into its own expression +fn delete_predicate_to_expr_string(input: &str) -> Vec<&str> { + input.to_lowercase(); + input.split("and").collect() +} + fn field_string_value(i: &str) -> IResult<&str, EscapedStr<'_>> { // https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format // For string field values, backslash is only used to escape itself(\) or double diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 1ce534c536..35039f59ef 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -4,8 +4,10 @@ use std::sync::Arc; use std::time::Instant; use data_types::{server_id::ServerId, DatabaseName}; +use datafusion::logical_plan::{col, lit}; use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound}; use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *}; +use influxdb_line_protocol::parse_delete_predicate; use query::predicate::PredicateBuilder; use query::QueryDatabase; use server::rules::ProvidedDatabaseRules; @@ -561,7 +563,7 @@ where let DeleteRequest { db_name, table_name, - delete_predicate: _, + delete_predicate, start_time, stop_time, } = request.into_inner(); @@ -575,7 +577,7 @@ where // parse and validate delete predicate which is a conjunctive expressions // with columns being compared to literals using = or != operators - // NGA: todo + let predicates = parse_delete_predicate(delete_predicate.as_str()); // Validate that the database name is legit let db_name = DatabaseName::new(db_name).field("db_name")?; @@ -584,11 +586,25 @@ where .db(&db_name) .map_err(default_server_error_handler)?; - // Build the delete predicate that include all delete expression and time range - let del_predicate = PredicateBuilder::new() + // Build the delete predicate that include the table to be deleted, all delete expressions, and time range + let mut del_predicate = PredicateBuilder::new() + .table(table_name.clone()) .timestamp_range(start, stop) - //.add_expr // NGA todo: repeat to add delete expressions here .build(); + for expr in predicates { + let e = match expr.op.as_str() { + "=" => col(expr.column_name.as_str()).eq(lit(expr.literal)), + "!=" => col(expr.column_name.as_str()).eq(lit(expr.literal)), + _ => panic!( + "{} operator not supported in delete expression", + expr.op.as_str() + ), + }; + + del_predicate.exprs.push(e); + } + + // NGA todo: need to validate if the table and all of its columns in delete predicate are legit? db.delete(&table_name, &del_predicate) .await diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index f9c9ff3efe..cc560e37fd 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -1387,7 +1387,7 @@ async fn test_delete() { let stop = "1000"; management_client - .delete(&db_name, table_name, delete_predicate, start, stop) // note that this function currently does nothing + .delete(&db_name, table_name, delete_predicate, start, stop) .await .unwrap();