From 2cd063698f9e0be4e62a00f14e3eac83d7c4162c Mon Sep 17 00:00:00 2001 From: Dom Date: Mon, 17 Jan 2022 16:25:08 +0000 Subject: [PATCH] refactor: API agnostic DML delete handler Changes the DmlHandler::delete() trait method to accept required params, and accept a DeletePredicate instead of a HttpDeleteRequest so that it can be re-used in the gRPC handler. --- router2/src/dml_handler/mock.rs | 25 ++++++++++++++++----- router2/src/dml_handler/nop.rs | 11 +++++---- router2/src/dml_handler/trait.rs | 9 ++++---- router2/src/server/http.rs | 38 ++++++++++++++++++-------------- 4 files changed, 52 insertions(+), 31 deletions(-) diff --git a/router2/src/dml_handler/mock.rs b/router2/src/dml_handler/mock.rs index 971b76372a..cae501df4a 100644 --- a/router2/src/dml_handler/mock.rs +++ b/router2/src/dml_handler/mock.rs @@ -1,13 +1,12 @@ use std::{collections::VecDeque, sync::Arc}; use async_trait::async_trait; -use data_types::DatabaseName; +use data_types::{delete_predicate::DeletePredicate, DatabaseName}; use hashbrown::HashMap; use mutable_batch::MutableBatch; use mutable_batch_lp::PayloadStatistics; use parking_lot::Mutex; -use predicate::delete_predicate::HttpDeleteRequest; use trace::ctx::SpanContext; use super::{DmlError, DmlHandler}; @@ -20,7 +19,11 @@ pub enum MockDmlHandlerCall { payload_stats: PayloadStatistics, body_len: usize, }, - Delete(HttpDeleteRequest), + Delete { + namespace: String, + table: String, + predicate: DeletePredicate, + }, } #[derive(Debug, Default)] @@ -89,11 +92,21 @@ impl DmlHandler for Arc { ) } - async fn delete( + async fn delete<'a>( &self, - delete: HttpDeleteRequest, + namespace: DatabaseName<'_>, + table: impl Into + Send + Sync + 'a, + predicate: DeletePredicate, _span_ctx: Option, ) -> Result<(), DmlError> { - record_and_return!(self, MockDmlHandlerCall::Delete(delete), delete_return) + record_and_return!( + self, + MockDmlHandlerCall::Delete { + namespace: namespace.into(), + table: table.into(), + predicate, + }, + delete_return + ) } } diff --git a/router2/src/dml_handler/nop.rs b/router2/src/dml_handler/nop.rs index 4b9af23fb9..7d51ab1040 100644 --- a/router2/src/dml_handler/nop.rs +++ b/router2/src/dml_handler/nop.rs @@ -1,7 +1,7 @@ //! A NOP implementation of [`DmlHandler`]. use async_trait::async_trait; -use data_types::DatabaseName; +use data_types::{delete_predicate::DeletePredicate, DatabaseName}; use hashbrown::HashMap; use mutable_batch::MutableBatch; @@ -29,12 +29,15 @@ impl DmlHandler for NopDmlHandler { Ok(()) } - async fn delete( + async fn delete<'a>( &self, - delete: predicate::delete_predicate::HttpDeleteRequest, + namespace: DatabaseName<'_>, + table: impl Into + Send + Sync + 'a, + predicate: DeletePredicate, _span_ctx: Option, ) -> Result<(), DmlError> { - info!(?delete, "dropping delete operation"); + let table = table.into(); + info!(%namespace, %table, ?predicate, "dropping delete operation"); Ok(()) } } diff --git a/router2/src/dml_handler/trait.rs b/router2/src/dml_handler/trait.rs index f2122690d0..fb352bf881 100644 --- a/router2/src/dml_handler/trait.rs +++ b/router2/src/dml_handler/trait.rs @@ -1,12 +1,11 @@ use std::fmt::Debug; use async_trait::async_trait; -use data_types::DatabaseName; +use data_types::{delete_predicate::DeletePredicate, DatabaseName}; use hashbrown::HashMap; use mutable_batch::MutableBatch; use mutable_batch_lp::PayloadStatistics; -use predicate::delete_predicate::HttpDeleteRequest; use thiserror::Error; use trace::ctx::SpanContext; @@ -37,9 +36,11 @@ pub trait DmlHandler: Debug + Send + Sync { ) -> Result<(), DmlError>; /// Delete the data specified in `delete`. - async fn delete( + async fn delete<'a>( &self, - delete: HttpDeleteRequest, + namespace: DatabaseName<'_>, + table_name: impl Into + Send + Sync + 'a, + predicate: DeletePredicate, span_ctx: Option, ) -> Result<(), DmlError>; } diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index 7fe809650e..e59de50d6c 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -8,7 +8,7 @@ use data_types::names::{org_and_bucket_to_database, OrgBucketMappingError}; use futures::StreamExt; use hyper::{header::CONTENT_ENCODING, Body, Method, Request, Response, StatusCode}; use observability_deps::tracing::*; -use predicate::delete_predicate::parse_http_delete_request; +use predicate::delete_predicate::{parse_delete_predicate, parse_http_delete_request}; use serde::Deserialize; use thiserror::Error; use time::{SystemProvider, TimeProvider}; @@ -221,8 +221,15 @@ where // Parse and extract table name (which can be empty), start, stop, and predicate let parsed_delete = parse_http_delete_request(body)?; + let predicate = parse_delete_predicate( + &parsed_delete.start_time, + &parsed_delete.stop_time, + &parsed_delete.predicate, + )?; - self.dml_handler.delete(parsed_delete, span_ctx).await?; + self.dml_handler + .delete(namespace, parsed_delete.table_name, predicate, span_ctx) + .await?; Ok(()) } @@ -586,11 +593,10 @@ mod tests { body = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=its_a_table and location=Boston"}"#.as_bytes(), dml_handler = [Ok(())], want_result = Ok(_), - want_dml_calls = [MockDmlHandlerCall::Delete(d)] => { - assert_eq!(d.table_name, "its_a_table"); - assert_eq!(d.start_time, "2021-04-01T14:00:00Z"); - assert_eq!(d.stop_time, "2021-04-02T14:00:00Z"); - assert_eq!(d.predicate, "location=Boston"); + want_dml_calls = [MockDmlHandlerCall::Delete{namespace, table, predicate}] => { + assert_eq!(table, "its_a_table"); + assert_eq!(namespace, "bananas_test"); + assert!(!predicate.exprs.is_empty()); } ); @@ -654,11 +660,10 @@ mod tests { body = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=its_a_table and location=Boston"}"#.as_bytes(), dml_handler = [Err(DmlError::DatabaseNotFound("bananas_test".to_string()))], want_result = Err(Error::DmlHandler(DmlError::DatabaseNotFound(_))), - want_dml_calls = [MockDmlHandlerCall::Delete(d)] => { - assert_eq!(d.table_name, "its_a_table"); - assert_eq!(d.start_time, "2021-04-01T14:00:00Z"); - assert_eq!(d.stop_time, "2021-04-02T14:00:00Z"); - assert_eq!(d.predicate, "location=Boston"); + want_dml_calls = [MockDmlHandlerCall::Delete{namespace, table, predicate}] => { + assert_eq!(table, "its_a_table"); + assert_eq!(namespace, "bananas_test"); + assert!(!predicate.exprs.is_empty()); } ); @@ -668,11 +673,10 @@ mod tests { body = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=its_a_table and location=Boston"}"#.as_bytes(), dml_handler = [Err(DmlError::Internal("💣".into()))], want_result = Err(Error::DmlHandler(DmlError::Internal(_))), - want_dml_calls = [MockDmlHandlerCall::Delete(d)] => { - assert_eq!(d.table_name, "its_a_table"); - assert_eq!(d.start_time, "2021-04-01T14:00:00Z"); - assert_eq!(d.stop_time, "2021-04-02T14:00:00Z"); - assert_eq!(d.predicate, "location=Boston"); + want_dml_calls = [MockDmlHandlerCall::Delete{namespace, table, predicate}] => { + assert_eq!(table, "its_a_table"); + assert_eq!(namespace, "bananas_test"); + assert!(!predicate.exprs.is_empty()); } );