From ee94e9038ac1e70d2cdc3abb7f4e7c02f41aa5e1 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 1 Oct 2021 12:15:00 -0400 Subject: [PATCH] test: finalize codin up delete http endpoints and end-to-end tests --- predicate/src/predicate.rs | 16 ++-- server/src/db.rs | 5 ++ src/influxdb_ioxd/http.rs | 109 +++++++++++++++++++++++++--- src/influxdb_ioxd/rpc/management.rs | 7 +- 4 files changed, 121 insertions(+), 16 deletions(-) diff --git a/predicate/src/predicate.rs b/predicate/src/predicate.rs index 4482ea44d0..6f166387ab 100644 --- a/predicate/src/predicate.rs +++ b/predicate/src/predicate.rs @@ -545,10 +545,16 @@ impl ParseDeletePredicate { return Ok(vec![]); } + // Since table-name can be empty, let assign it a random name to have sqlparser work + let mut table = "some_table"; + if !table_name.is_empty() { + table = table_name; + } + // Now add this predicate string into a DELETE SQL to user sqlparser to parse it // "DELETE FROM table_name WHERE predicate" let mut sql = "DELETE FROM ".to_string(); - sql.push_str(table_name); + sql.push_str(table); sql.push_str(" WHERE "); sql.push_str(predicate); @@ -596,7 +602,7 @@ impl ParseDeletePredicate { table_name: String, start_time: String, stop_time: String, - predicate: String + predicate: String, ) -> Result { // parse time range and the predicate let parse_delete_pred = ParseDeletePredicate::try_new( @@ -605,17 +611,17 @@ impl ParseDeletePredicate { stop_time.as_str(), predicate.as_str(), )?; - + let mut del_predicate = PredicateBuilder::new() .table(table_name) .timestamp_range(parse_delete_pred.start_time, parse_delete_pred.stop_time) .build(); - + // Add the predicate binary expressions for expr in parse_delete_pred.predicate { del_predicate.exprs.push(expr); } - + Ok(del_predicate) } diff --git a/server/src/db.rs b/server/src/db.rs index 993001cdcf..609f788a10 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -355,6 +355,11 @@ impl Db { this } + /// Return all table names of the DB + pub fn table_names(&self) -> Vec { + self.catalog.table_names() + } + /// Allow persistence if database rules all it. pub async fn unsuppress_persistence(&self) { let mut guard = self.lifecycle_policy.lock().await; diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index dbeaf6f110..618544b41f 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -41,7 +41,7 @@ use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; use trace_http::ctx::TraceHeaderParser; -use crate::influxdb_ioxd::{http::metrics::LineProtocolMetrics}; +use crate::influxdb_ioxd::http::metrics::LineProtocolMetrics; use hyper::server::conn::{AddrIncoming, AddrStream}; use std::convert::Infallible; use std::num::NonZeroI32; @@ -282,7 +282,7 @@ impl ApplicationError { Self::ParsingLineProtocol { .. } => self.bad_request(), Self::ParsingDelete { .. } => self.bad_request(), Self::BuildingDeletePredicate { .. } => self.bad_request(), - Self::ExecutingDelete { .. } => self.internal_error(), + Self::ExecutingDelete { .. } => self.internal_error(), Self::ReadingBodyAsGzip { .. } => self.bad_request(), Self::ClientHangup { .. } => self.bad_request(), Self::RouteNotFound { .. } => self.not_found(), @@ -402,6 +402,7 @@ where (Method::GET, "/health") => health(), (Method::GET, "/metrics") => handle_metrics(server.application.as_ref()), (Method::POST, "/api/v2/write") => write(req, server.as_ref()).await, + (Method::POST, "/api/v2/delete") => delete(req, server.as_ref()).await, (Method::GET, "/api/v3/query") => query(req, server.as_ref()).await, (Method::GET, "/debug/pprof") => pprof_home(req).await, (Method::GET, "/debug/pprof/profile") => pprof_profile(req).await, @@ -571,13 +572,10 @@ where let Server { app_server: server, max_request_size, - // lp_metrics, NGA: this is for delete, no need lp metrics .. } = server; - let max_request_size = *max_request_size; let server = Arc::clone(server); - // let lp_metrics = Arc::clone(lp_metrics); // Extract the DB name from the request // db_name = orrID_bucketID @@ -604,14 +602,25 @@ where // Validate that the database name is legit let db = server.db(&db_name)?; + // Build delete predicate let del_predicate = ParseDeletePredicate::build_delete_predicate(table_name.clone(), start, stop, predicate) .context(BuildingDeletePredicate { input: body })?; - //execute delete - db.delete(&table_name, Arc::new(del_predicate)) - .await - .context(ExecutingDelete { input: body })?; + // Tables data will be deleted from + let mut tables = vec![]; + if table_name.is_empty() { + tables = db.table_names(); + } else { + tables.push(table_name); + } + + // Execute delete + for table in tables { + db.delete(&table, Arc::new(del_predicate.clone())) + .await + .context(ExecutingDelete { input: body })?; + } Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -1055,6 +1064,88 @@ mod tests { assert_batches_eq!(expected, &batches); } + #[tokio::test] + async fn test_delete() { + // Set up server + let application = make_application(); + let app_server = make_server(Arc::clone(&application)); + app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); + app_server.wait_for_init().await.unwrap(); + app_server + .create_database(make_rules("MyOrg_MyBucket")) + .await + .unwrap(); + let server_url = test_server(application, Arc::clone(&app_server), None); + + // Set up client + let client = Client::new(); + let bucket_name = "MyBucket"; + let org_name = "MyOrg"; + + // Client requests delete something from an empty DB + let delete_line = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z", "predicate":"host=\"Orient.local\""}"#; + let response = client + .post(&format!( + "{}/api/v2/delete?bucket={}&org={}", + server_url, bucket_name, org_name + )) + .body(delete_line) + .send() + .await; + check_response("delete", response, StatusCode::NO_CONTENT, Some("")).await; + + // Client writes data to the server + let lp_data = r#"h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1617286224000000000 + h2o_temperature,location=Boston,state=MA surface_degrees=47.5,bottom_degrees=35 1617286224000000123"#; + let response = client + .post(&format!( + "{}/api/v2/write?bucket={}&org={}", + server_url, bucket_name, org_name + )) + .body(lp_data) + .send() + .await; + check_response("write", response, StatusCode::NO_CONTENT, Some("")).await; + + // Check that the data got into the right bucket + let test_db = app_server + .db(&DatabaseName::new("MyOrg_MyBucket").unwrap()) + .expect("Database exists"); + let batches = run_query(Arc::clone(&test_db), "select * from h2o_temperature").await; + let expected = vec![ + "+----------------+--------------+-------+-----------------+--------------------------------+", + "| bottom_degrees | location | state | surface_degrees | time |", + "+----------------+--------------+-------+-----------------+--------------------------------+", + "| 35 | Boston | MA | 47.5 | 2021-04-01T14:10:24.000000123Z |", + "| 50.4 | santa_monica | CA | 65.2 | 2021-04-01T14:10:24Z |", + "+----------------+--------------+-------+-----------------+--------------------------------+", + ]; + assert_batches_eq!(expected, &batches); + + // Now delete something + let delete_line = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"location=Boston"}"#; + let response = client + .post(&format!( + "{}/api/v2/delete?bucket={}&org={}", + server_url, bucket_name, org_name + )) + .body(delete_line) + .send() + .await; + check_response("delete", response, StatusCode::NO_CONTENT, Some("")).await; + + // query again and should not get the deleted data + let batches = run_query(test_db, "select * from h2o_temperature").await; + let expected = vec![ + "+----------------+--------------+-------+-----------------+----------------------+", + "| bottom_degrees | location | state | surface_degrees | time |", + "+----------------+--------------+-------+-----------------+----------------------+", + "| 50.4 | santa_monica | CA | 65.2 | 2021-04-01T14:10:24Z |", + "+----------------+--------------+-------+-----------------+----------------------+", + ]; + assert_batches_eq!(expected, &batches); + } + #[tokio::test] async fn test_write_metrics() { let application = make_application(); diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 28060a7208..7aac6806d7 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -7,7 +7,7 @@ use data_types::chunk_metadata::ChunkId; use data_types::{server_id::ServerId, DatabaseName}; use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound}; use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *}; -use predicate::predicate::{ParseDeletePredicate}; +use predicate::predicate::ParseDeletePredicate; use query::QueryDatabase; use server::rules::ProvidedDatabaseRules; use server::{ApplicationState, ConnectionManager, Error, Server}; @@ -624,7 +624,10 @@ where .map_err(default_server_error_handler)?; let del_predicate_result = ParseDeletePredicate::build_delete_predicate( - table_name.clone(), start_time.clone(), stop_time.clone(), predicate.clone(), + table_name.clone(), + start_time.clone(), + stop_time.clone(), + predicate.clone(), ); match del_predicate_result { Err(_) => {