test: finalize codin up delete http endpoints and end-to-end tests

pull/24376/head
Nga Tran 2021-10-01 12:15:00 -04:00
parent 53a45d89b7
commit ee94e9038a
4 changed files with 121 additions and 16 deletions

View File

@ -545,10 +545,16 @@ impl ParseDeletePredicate {
return Ok(vec![]);
}
// Since table-name can be empty, let assign it a random name to have sqlparser work
let mut table = "some_table";
if !table_name.is_empty() {
table = table_name;
}
// Now add this predicate string into a DELETE SQL to user sqlparser to parse it
// "DELETE FROM table_name WHERE predicate"
let mut sql = "DELETE FROM ".to_string();
sql.push_str(table_name);
sql.push_str(table);
sql.push_str(" WHERE ");
sql.push_str(predicate);
@ -596,7 +602,7 @@ impl ParseDeletePredicate {
table_name: String,
start_time: String,
stop_time: String,
predicate: String
predicate: String,
) -> Result<Predicate, Error> {
// parse time range and the predicate
let parse_delete_pred = ParseDeletePredicate::try_new(
@ -605,17 +611,17 @@ impl ParseDeletePredicate {
stop_time.as_str(),
predicate.as_str(),
)?;
let mut del_predicate = PredicateBuilder::new()
.table(table_name)
.timestamp_range(parse_delete_pred.start_time, parse_delete_pred.stop_time)
.build();
// Add the predicate binary expressions
for expr in parse_delete_pred.predicate {
del_predicate.exprs.push(expr);
}
Ok(del_predicate)
}

View File

@ -355,6 +355,11 @@ impl Db {
this
}
/// Return all table names of the DB
pub fn table_names(&self) -> Vec<String> {
self.catalog.table_names()
}
/// Allow persistence if database rules all it.
pub async fn unsuppress_persistence(&self) {
let mut guard = self.lifecycle_policy.lock().await;

View File

@ -41,7 +41,7 @@ use serde::Deserialize;
use snafu::{OptionExt, ResultExt, Snafu};
use trace_http::ctx::TraceHeaderParser;
use crate::influxdb_ioxd::{http::metrics::LineProtocolMetrics};
use crate::influxdb_ioxd::http::metrics::LineProtocolMetrics;
use hyper::server::conn::{AddrIncoming, AddrStream};
use std::convert::Infallible;
use std::num::NonZeroI32;
@ -282,7 +282,7 @@ impl ApplicationError {
Self::ParsingLineProtocol { .. } => self.bad_request(),
Self::ParsingDelete { .. } => self.bad_request(),
Self::BuildingDeletePredicate { .. } => self.bad_request(),
Self::ExecutingDelete { .. } => self.internal_error(),
Self::ExecutingDelete { .. } => self.internal_error(),
Self::ReadingBodyAsGzip { .. } => self.bad_request(),
Self::ClientHangup { .. } => self.bad_request(),
Self::RouteNotFound { .. } => self.not_found(),
@ -402,6 +402,7 @@ where
(Method::GET, "/health") => health(),
(Method::GET, "/metrics") => handle_metrics(server.application.as_ref()),
(Method::POST, "/api/v2/write") => write(req, server.as_ref()).await,
(Method::POST, "/api/v2/delete") => delete(req, server.as_ref()).await,
(Method::GET, "/api/v3/query") => query(req, server.as_ref()).await,
(Method::GET, "/debug/pprof") => pprof_home(req).await,
(Method::GET, "/debug/pprof/profile") => pprof_profile(req).await,
@ -571,13 +572,10 @@ where
let Server {
app_server: server,
max_request_size,
// lp_metrics, NGA: this is for delete, no need lp metrics
..
} = server;
let max_request_size = *max_request_size;
let server = Arc::clone(server);
// let lp_metrics = Arc::clone(lp_metrics);
// Extract the DB name from the request
// db_name = orrID_bucketID
@ -604,14 +602,25 @@ where
// Validate that the database name is legit
let db = server.db(&db_name)?;
// Build delete predicate
let del_predicate =
ParseDeletePredicate::build_delete_predicate(table_name.clone(), start, stop, predicate)
.context(BuildingDeletePredicate { input: body })?;
//execute delete
db.delete(&table_name, Arc::new(del_predicate))
.await
.context(ExecutingDelete { input: body })?;
// Tables data will be deleted from
let mut tables = vec![];
if table_name.is_empty() {
tables = db.table_names();
} else {
tables.push(table_name);
}
// Execute delete
for table in tables {
db.delete(&table, Arc::new(del_predicate.clone()))
.await
.context(ExecutingDelete { input: body })?;
}
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@ -1055,6 +1064,88 @@ mod tests {
assert_batches_eq!(expected, &batches);
}
#[tokio::test]
async fn test_delete() {
// Set up server
let application = make_application();
let app_server = make_server(Arc::clone(&application));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.wait_for_init().await.unwrap();
app_server
.create_database(make_rules("MyOrg_MyBucket"))
.await
.unwrap();
let server_url = test_server(application, Arc::clone(&app_server), None);
// Set up client
let client = Client::new();
let bucket_name = "MyBucket";
let org_name = "MyOrg";
// Client requests delete something from an empty DB
let delete_line = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z", "predicate":"host=\"Orient.local\""}"#;
let response = client
.post(&format!(
"{}/api/v2/delete?bucket={}&org={}",
server_url, bucket_name, org_name
))
.body(delete_line)
.send()
.await;
check_response("delete", response, StatusCode::NO_CONTENT, Some("")).await;
// Client writes data to the server
let lp_data = r#"h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1617286224000000000
h2o_temperature,location=Boston,state=MA surface_degrees=47.5,bottom_degrees=35 1617286224000000123"#;
let response = client
.post(&format!(
"{}/api/v2/write?bucket={}&org={}",
server_url, bucket_name, org_name
))
.body(lp_data)
.send()
.await;
check_response("write", response, StatusCode::NO_CONTENT, Some("")).await;
// Check that the data got into the right bucket
let test_db = app_server
.db(&DatabaseName::new("MyOrg_MyBucket").unwrap())
.expect("Database exists");
let batches = run_query(Arc::clone(&test_db), "select * from h2o_temperature").await;
let expected = vec![
"+----------------+--------------+-------+-----------------+--------------------------------+",
"| bottom_degrees | location | state | surface_degrees | time |",
"+----------------+--------------+-------+-----------------+--------------------------------+",
"| 35 | Boston | MA | 47.5 | 2021-04-01T14:10:24.000000123Z |",
"| 50.4 | santa_monica | CA | 65.2 | 2021-04-01T14:10:24Z |",
"+----------------+--------------+-------+-----------------+--------------------------------+",
];
assert_batches_eq!(expected, &batches);
// Now delete something
let delete_line = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"location=Boston"}"#;
let response = client
.post(&format!(
"{}/api/v2/delete?bucket={}&org={}",
server_url, bucket_name, org_name
))
.body(delete_line)
.send()
.await;
check_response("delete", response, StatusCode::NO_CONTENT, Some("")).await;
// query again and should not get the deleted data
let batches = run_query(test_db, "select * from h2o_temperature").await;
let expected = vec![
"+----------------+--------------+-------+-----------------+----------------------+",
"| bottom_degrees | location | state | surface_degrees | time |",
"+----------------+--------------+-------+-----------------+----------------------+",
"| 50.4 | santa_monica | CA | 65.2 | 2021-04-01T14:10:24Z |",
"+----------------+--------------+-------+-----------------+----------------------+",
];
assert_batches_eq!(expected, &batches);
}
#[tokio::test]
async fn test_write_metrics() {
let application = make_application();

View File

@ -7,7 +7,7 @@ use data_types::chunk_metadata::ChunkId;
use data_types::{server_id::ServerId, DatabaseName};
use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound};
use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *};
use predicate::predicate::{ParseDeletePredicate};
use predicate::predicate::ParseDeletePredicate;
use query::QueryDatabase;
use server::rules::ProvidedDatabaseRules;
use server::{ApplicationState, ConnectionManager, Error, Server};
@ -624,7 +624,10 @@ where
.map_err(default_server_error_handler)?;
let del_predicate_result = ParseDeletePredicate::build_delete_predicate(
table_name.clone(), start_time.clone(), stop_time.clone(), predicate.clone(),
table_name.clone(),
start_time.clone(),
stop_time.clone(),
predicate.clone(),
);
match del_predicate_result {
Err(_) => {