feat: implement delete parser
parent
a4183de411
commit
efcdc8e50f
|
|
@ -955,8 +955,8 @@ impl Client {
|
|||
let start_time = start_time.into();
|
||||
let stop_time = stop_time.into();
|
||||
|
||||
// NGA todo: Should parse and validate start_time, stop_time, and delete_predicate here
|
||||
// at in client or send them to the server and do the parsing and validation there?
|
||||
// NGA question: Currently, the "parse and validate start_time, stop_time, and delete_predicate" is
|
||||
// done in the server's delete function. Should do it here at the client side instead?
|
||||
self.inner
|
||||
.delete(DeleteRequest {
|
||||
db_name,
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ use nom::{
|
|||
combinator::{map, opt, recognize},
|
||||
multi::many0,
|
||||
sequence::{preceded, separated_pair, terminated, tuple},
|
||||
Parser,
|
||||
};
|
||||
use observability_deps::tracing::debug;
|
||||
use smallvec::SmallVec;
|
||||
|
|
@ -504,7 +505,7 @@ pub fn parse_lines(input: &str) -> impl Iterator<Item = Result<ParsedLine<'_>>>
|
|||
})
|
||||
}
|
||||
|
||||
/// Split `input` into invidividual lines to be parsed, based on the
|
||||
/// Split `input` into individual lines to be parsed, based on the
|
||||
/// rules of the Line Protocol format.
|
||||
///
|
||||
/// This code is more or less a direct port of the [Go implementation of
|
||||
|
|
@ -717,6 +718,64 @@ pub fn timestamp(i: &str) -> IResult<&str, i64> {
|
|||
})(i)
|
||||
}
|
||||
|
||||
/// Parse delete predicate which is in form of conjunctive expressions with
|
||||
/// columns being compared to literals using = or != operators.
|
||||
/// Example: city = "Boston" AND building != 12 and month = 10
|
||||
/// This function is currently used to support Delete's gPRC API only.
|
||||
/// In the future, when we support all kinds of delete predicates supported in
|
||||
/// Datafusion, we may want to reuse the parser used by Datafusion to keep
|
||||
/// things consistent
|
||||
pub fn parse_delete_predicate(input: &str) -> Vec<BinaryExpr> {
|
||||
// Convert everything after "and" into an expression string
|
||||
let expr_strings = delete_predicate_to_expr_string(input);
|
||||
let mut exprs: Vec<BinaryExpr> = vec![];
|
||||
for s in expr_strings {
|
||||
let expr = parse_delete_expression(s);
|
||||
exprs.push(expr);
|
||||
}
|
||||
exprs
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct BinaryExpr {
|
||||
pub column_name: String,
|
||||
pub op: String, // either "=" or "!="
|
||||
pub literal: String, // constant
|
||||
}
|
||||
|
||||
/// parse one binary expression column_name = literal
|
||||
fn parse_delete_expression(input: &str) -> BinaryExpr {
|
||||
// See if this is an equality comparison
|
||||
let expr = separated_pair(field_string_value, tag("="), field_value).parse(input);
|
||||
if expr.is_err() {
|
||||
// See if this is an inequality comparison
|
||||
let expr = separated_pair(field_string_value, tag("!="), field_value).parse(input);
|
||||
if let Err(e) = expr {
|
||||
panic!("Invalid Delete Predicate: {}, {}", input, e);
|
||||
}
|
||||
|
||||
let expr = expr.unwrap();
|
||||
return BinaryExpr {
|
||||
column_name: expr.1 .0.to_string(),
|
||||
op: "!=".to_string(),
|
||||
literal: expr.1 .1.to_string(),
|
||||
};
|
||||
}
|
||||
|
||||
let expr = expr.unwrap();
|
||||
BinaryExpr {
|
||||
column_name: expr.1 .0.to_string(),
|
||||
op: "!=".to_string(),
|
||||
literal: expr.1 .1.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Split string after "and" into its own expression
|
||||
fn delete_predicate_to_expr_string(input: &str) -> Vec<&str> {
|
||||
input.to_lowercase();
|
||||
input.split("and").collect()
|
||||
}
|
||||
|
||||
fn field_string_value(i: &str) -> IResult<&str, EscapedStr<'_>> {
|
||||
// https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
|
||||
// For string field values, backslash is only used to escape itself(\) or double
|
||||
|
|
|
|||
|
|
@ -4,8 +4,10 @@ use std::sync::Arc;
|
|||
use std::time::Instant;
|
||||
|
||||
use data_types::{server_id::ServerId, DatabaseName};
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound};
|
||||
use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *};
|
||||
use influxdb_line_protocol::parse_delete_predicate;
|
||||
use query::predicate::PredicateBuilder;
|
||||
use query::QueryDatabase;
|
||||
use server::rules::ProvidedDatabaseRules;
|
||||
|
|
@ -561,7 +563,7 @@ where
|
|||
let DeleteRequest {
|
||||
db_name,
|
||||
table_name,
|
||||
delete_predicate: _,
|
||||
delete_predicate,
|
||||
start_time,
|
||||
stop_time,
|
||||
} = request.into_inner();
|
||||
|
|
@ -575,7 +577,7 @@ where
|
|||
|
||||
// parse and validate delete predicate which is a conjunctive expressions
|
||||
// with columns being compared to literals using = or != operators
|
||||
// NGA: todo
|
||||
let predicates = parse_delete_predicate(delete_predicate.as_str());
|
||||
|
||||
// Validate that the database name is legit
|
||||
let db_name = DatabaseName::new(db_name).field("db_name")?;
|
||||
|
|
@ -584,11 +586,25 @@ where
|
|||
.db(&db_name)
|
||||
.map_err(default_server_error_handler)?;
|
||||
|
||||
// Build the delete predicate that include all delete expression and time range
|
||||
let del_predicate = PredicateBuilder::new()
|
||||
// Build the delete predicate that include the table to be deleted, all delete expressions, and time range
|
||||
let mut del_predicate = PredicateBuilder::new()
|
||||
.table(table_name.clone())
|
||||
.timestamp_range(start, stop)
|
||||
//.add_expr // NGA todo: repeat to add delete expressions here
|
||||
.build();
|
||||
for expr in predicates {
|
||||
let e = match expr.op.as_str() {
|
||||
"=" => col(expr.column_name.as_str()).eq(lit(expr.literal)),
|
||||
"!=" => col(expr.column_name.as_str()).eq(lit(expr.literal)),
|
||||
_ => panic!(
|
||||
"{} operator not supported in delete expression",
|
||||
expr.op.as_str()
|
||||
),
|
||||
};
|
||||
|
||||
del_predicate.exprs.push(e);
|
||||
}
|
||||
|
||||
// NGA todo: need to validate if the table and all of its columns in delete predicate are legit?
|
||||
|
||||
db.delete(&table_name, &del_predicate)
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -1387,7 +1387,7 @@ async fn test_delete() {
|
|||
let stop = "1000";
|
||||
|
||||
management_client
|
||||
.delete(&db_name, table_name, delete_predicate, start, stop) // note that this function currently does nothing
|
||||
.delete(&db_name, table_name, delete_predicate, start, stop)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue