refactor: API agnostic DML delete handler
Changes the DmlHandler::delete() trait method to accept required params, and accept a DeletePredicate instead of a HttpDeleteRequest so that it can be re-used in the gRPC handler.pull/24376/head
parent
362446b885
commit
2cd063698f
|
@ -1,13 +1,12 @@
|
||||||
use std::{collections::VecDeque, sync::Arc};
|
use std::{collections::VecDeque, sync::Arc};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::DatabaseName;
|
use data_types::{delete_predicate::DeletePredicate, DatabaseName};
|
||||||
|
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use mutable_batch::MutableBatch;
|
use mutable_batch::MutableBatch;
|
||||||
use mutable_batch_lp::PayloadStatistics;
|
use mutable_batch_lp::PayloadStatistics;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use predicate::delete_predicate::HttpDeleteRequest;
|
|
||||||
use trace::ctx::SpanContext;
|
use trace::ctx::SpanContext;
|
||||||
|
|
||||||
use super::{DmlError, DmlHandler};
|
use super::{DmlError, DmlHandler};
|
||||||
|
@ -20,7 +19,11 @@ pub enum MockDmlHandlerCall {
|
||||||
payload_stats: PayloadStatistics,
|
payload_stats: PayloadStatistics,
|
||||||
body_len: usize,
|
body_len: usize,
|
||||||
},
|
},
|
||||||
Delete(HttpDeleteRequest),
|
Delete {
|
||||||
|
namespace: String,
|
||||||
|
table: String,
|
||||||
|
predicate: DeletePredicate,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
|
@ -89,11 +92,21 @@ impl DmlHandler for Arc<MockDmlHandler> {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete(
|
async fn delete<'a>(
|
||||||
&self,
|
&self,
|
||||||
delete: HttpDeleteRequest,
|
namespace: DatabaseName<'_>,
|
||||||
|
table: impl Into<String> + Send + Sync + 'a,
|
||||||
|
predicate: DeletePredicate,
|
||||||
_span_ctx: Option<SpanContext>,
|
_span_ctx: Option<SpanContext>,
|
||||||
) -> Result<(), DmlError> {
|
) -> Result<(), DmlError> {
|
||||||
record_and_return!(self, MockDmlHandlerCall::Delete(delete), delete_return)
|
record_and_return!(
|
||||||
|
self,
|
||||||
|
MockDmlHandlerCall::Delete {
|
||||||
|
namespace: namespace.into(),
|
||||||
|
table: table.into(),
|
||||||
|
predicate,
|
||||||
|
},
|
||||||
|
delete_return
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
//! A NOP implementation of [`DmlHandler`].
|
//! A NOP implementation of [`DmlHandler`].
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::DatabaseName;
|
use data_types::{delete_predicate::DeletePredicate, DatabaseName};
|
||||||
|
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use mutable_batch::MutableBatch;
|
use mutable_batch::MutableBatch;
|
||||||
|
@ -29,12 +29,15 @@ impl DmlHandler for NopDmlHandler {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete(
|
async fn delete<'a>(
|
||||||
&self,
|
&self,
|
||||||
delete: predicate::delete_predicate::HttpDeleteRequest,
|
namespace: DatabaseName<'_>,
|
||||||
|
table: impl Into<String> + Send + Sync + 'a,
|
||||||
|
predicate: DeletePredicate,
|
||||||
_span_ctx: Option<SpanContext>,
|
_span_ctx: Option<SpanContext>,
|
||||||
) -> Result<(), DmlError> {
|
) -> Result<(), DmlError> {
|
||||||
info!(?delete, "dropping delete operation");
|
let table = table.into();
|
||||||
|
info!(%namespace, %table, ?predicate, "dropping delete operation");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,11 @@
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::DatabaseName;
|
use data_types::{delete_predicate::DeletePredicate, DatabaseName};
|
||||||
|
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use mutable_batch::MutableBatch;
|
use mutable_batch::MutableBatch;
|
||||||
use mutable_batch_lp::PayloadStatistics;
|
use mutable_batch_lp::PayloadStatistics;
|
||||||
use predicate::delete_predicate::HttpDeleteRequest;
|
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use trace::ctx::SpanContext;
|
use trace::ctx::SpanContext;
|
||||||
|
|
||||||
|
@ -37,9 +36,11 @@ pub trait DmlHandler: Debug + Send + Sync {
|
||||||
) -> Result<(), DmlError>;
|
) -> Result<(), DmlError>;
|
||||||
|
|
||||||
/// Delete the data specified in `delete`.
|
/// Delete the data specified in `delete`.
|
||||||
async fn delete(
|
async fn delete<'a>(
|
||||||
&self,
|
&self,
|
||||||
delete: HttpDeleteRequest,
|
namespace: DatabaseName<'_>,
|
||||||
|
table_name: impl Into<String> + Send + Sync + 'a,
|
||||||
|
predicate: DeletePredicate,
|
||||||
span_ctx: Option<SpanContext>,
|
span_ctx: Option<SpanContext>,
|
||||||
) -> Result<(), DmlError>;
|
) -> Result<(), DmlError>;
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ use data_types::names::{org_and_bucket_to_database, OrgBucketMappingError};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use hyper::{header::CONTENT_ENCODING, Body, Method, Request, Response, StatusCode};
|
use hyper::{header::CONTENT_ENCODING, Body, Method, Request, Response, StatusCode};
|
||||||
use observability_deps::tracing::*;
|
use observability_deps::tracing::*;
|
||||||
use predicate::delete_predicate::parse_http_delete_request;
|
use predicate::delete_predicate::{parse_delete_predicate, parse_http_delete_request};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use time::{SystemProvider, TimeProvider};
|
use time::{SystemProvider, TimeProvider};
|
||||||
|
@ -221,8 +221,15 @@ where
|
||||||
|
|
||||||
// Parse and extract table name (which can be empty), start, stop, and predicate
|
// Parse and extract table name (which can be empty), start, stop, and predicate
|
||||||
let parsed_delete = parse_http_delete_request(body)?;
|
let parsed_delete = parse_http_delete_request(body)?;
|
||||||
|
let predicate = parse_delete_predicate(
|
||||||
|
&parsed_delete.start_time,
|
||||||
|
&parsed_delete.stop_time,
|
||||||
|
&parsed_delete.predicate,
|
||||||
|
)?;
|
||||||
|
|
||||||
self.dml_handler.delete(parsed_delete, span_ctx).await?;
|
self.dml_handler
|
||||||
|
.delete(namespace, parsed_delete.table_name, predicate, span_ctx)
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -586,11 +593,10 @@ mod tests {
|
||||||
body = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=its_a_table and location=Boston"}"#.as_bytes(),
|
body = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=its_a_table and location=Boston"}"#.as_bytes(),
|
||||||
dml_handler = [Ok(())],
|
dml_handler = [Ok(())],
|
||||||
want_result = Ok(_),
|
want_result = Ok(_),
|
||||||
want_dml_calls = [MockDmlHandlerCall::Delete(d)] => {
|
want_dml_calls = [MockDmlHandlerCall::Delete{namespace, table, predicate}] => {
|
||||||
assert_eq!(d.table_name, "its_a_table");
|
assert_eq!(table, "its_a_table");
|
||||||
assert_eq!(d.start_time, "2021-04-01T14:00:00Z");
|
assert_eq!(namespace, "bananas_test");
|
||||||
assert_eq!(d.stop_time, "2021-04-02T14:00:00Z");
|
assert!(!predicate.exprs.is_empty());
|
||||||
assert_eq!(d.predicate, "location=Boston");
|
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -654,11 +660,10 @@ mod tests {
|
||||||
body = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=its_a_table and location=Boston"}"#.as_bytes(),
|
body = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=its_a_table and location=Boston"}"#.as_bytes(),
|
||||||
dml_handler = [Err(DmlError::DatabaseNotFound("bananas_test".to_string()))],
|
dml_handler = [Err(DmlError::DatabaseNotFound("bananas_test".to_string()))],
|
||||||
want_result = Err(Error::DmlHandler(DmlError::DatabaseNotFound(_))),
|
want_result = Err(Error::DmlHandler(DmlError::DatabaseNotFound(_))),
|
||||||
want_dml_calls = [MockDmlHandlerCall::Delete(d)] => {
|
want_dml_calls = [MockDmlHandlerCall::Delete{namespace, table, predicate}] => {
|
||||||
assert_eq!(d.table_name, "its_a_table");
|
assert_eq!(table, "its_a_table");
|
||||||
assert_eq!(d.start_time, "2021-04-01T14:00:00Z");
|
assert_eq!(namespace, "bananas_test");
|
||||||
assert_eq!(d.stop_time, "2021-04-02T14:00:00Z");
|
assert!(!predicate.exprs.is_empty());
|
||||||
assert_eq!(d.predicate, "location=Boston");
|
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -668,11 +673,10 @@ mod tests {
|
||||||
body = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=its_a_table and location=Boston"}"#.as_bytes(),
|
body = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=its_a_table and location=Boston"}"#.as_bytes(),
|
||||||
dml_handler = [Err(DmlError::Internal("💣".into()))],
|
dml_handler = [Err(DmlError::Internal("💣".into()))],
|
||||||
want_result = Err(Error::DmlHandler(DmlError::Internal(_))),
|
want_result = Err(Error::DmlHandler(DmlError::Internal(_))),
|
||||||
want_dml_calls = [MockDmlHandlerCall::Delete(d)] => {
|
want_dml_calls = [MockDmlHandlerCall::Delete{namespace, table, predicate}] => {
|
||||||
assert_eq!(d.table_name, "its_a_table");
|
assert_eq!(table, "its_a_table");
|
||||||
assert_eq!(d.start_time, "2021-04-01T14:00:00Z");
|
assert_eq!(namespace, "bananas_test");
|
||||||
assert_eq!(d.stop_time, "2021-04-02T14:00:00Z");
|
assert!(!predicate.exprs.is_empty());
|
||||||
assert_eq!(d.predicate, "location=Boston");
|
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue