feat: Delete delete parsing code from router (#7573)

And return the "deletes unsupported" error sooner.

Co-authored-by: Dom <dom@itsallbroken.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Carol (Nichols || Goulding) 2023-04-18 05:57:02 -04:00 committed by GitHub
parent a06345c925
commit d60e4d5823
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 34 additions and 828 deletions

View File

@ -1,5 +1,5 @@
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName};
use trace::ctx::SpanContext;
use super::{DmlError, DmlHandler};
@ -53,7 +53,6 @@ where
// All errors are converted into DML errors before returning to the caller
// in order to present a consistent error type for chained handlers.
type WriteError = DmlError;
type DeleteError = DmlError;
/// Write `batches` to `namespace`.
async fn write(
@ -74,30 +73,4 @@ where
.await
.map_err(Into::into)
}
/// Delete the data specified in `delete`.
async fn delete(
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
table_name: &str,
predicate: &DeletePredicate,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
self.first
.delete(
namespace,
namespace_id,
table_name,
predicate,
span_ctx.clone(),
)
.await
.map_err(Into::into)?;
self.second
.delete(namespace, namespace_id, table_name, predicate, span_ctx)
.await
.map_err(Into::into)
}
}

View File

@ -1,7 +1,7 @@
use std::{fmt::Debug, marker::PhantomData};
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName};
use futures::{stream::FuturesUnordered, TryStreamExt};
use trace::ctx::SpanContext;
@ -42,7 +42,6 @@ where
type WriteInput = I;
type WriteOutput = ();
type WriteError = T::WriteError;
type DeleteError = T::DeleteError;
/// Concurrently execute the write inputs in `input` against the inner
/// handler, returning early and aborting in-flight writes if an error
@ -70,18 +69,4 @@ where
.await?;
Ok(())
}
/// Pass the delete through to the inner handler.
async fn delete(
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
table_name: &str,
predicate: &DeletePredicate,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
self.inner
.delete(namespace, namespace_id, table_name, predicate, span_ctx)
.await
}
}

View File

@ -1,11 +1,8 @@
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
use trace::{
ctx::SpanContext,
span::{SpanExt, SpanRecorder},
};
use trace::{ctx::SpanContext, span::SpanRecorder};
use super::DmlHandler;
@ -20,9 +17,6 @@ pub struct InstrumentationDecorator<T, P = SystemProvider> {
write_success: DurationHistogram,
write_error: DurationHistogram,
delete_success: DurationHistogram,
delete_error: DurationHistogram,
}
impl<T> InstrumentationDecorator<T> {
@ -31,25 +25,16 @@ impl<T> InstrumentationDecorator<T> {
pub fn new(name: &'static str, registry: &metric::Registry, inner: T) -> Self {
let write: Metric<DurationHistogram> =
registry.register_metric("dml_handler_write_duration", "write handler call duration");
let delete: Metric<DurationHistogram> = registry.register_metric(
"dml_handler_delete_duration",
"delete handler call duration",
);
let write_success = write.recorder(&[("handler", name), ("result", "success")]);
let write_error = write.recorder(&[("handler", name), ("result", "error")]);
let delete_success = delete.recorder(&[("handler", name), ("result", "success")]);
let delete_error = delete.recorder(&[("handler", name), ("result", "error")]);
Self {
name,
inner,
time_provider: Default::default(),
write_success,
write_error,
delete_success,
delete_error,
}
}
}
@ -61,7 +46,6 @@ where
{
type WriteInput = T::WriteInput;
type WriteError = T::WriteError;
type DeleteError = T::DeleteError;
type WriteOutput = T::WriteOutput;
/// Call the inner `write` method and record the call latency.
@ -100,43 +84,6 @@ where
res
}
/// Call the inner `delete` method and record the call latency.
async fn delete(
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
table_name: &str,
predicate: &DeletePredicate,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
let t = self.time_provider.now();
// Create a tracing span for this handler.
let mut span_recorder = SpanRecorder::new(span_ctx.child_span(self.name));
let res = self
.inner
.delete(namespace, namespace_id, table_name, predicate, span_ctx)
.await;
// Avoid exploding if time goes backwards - simply drop the measurement
// if it happens.
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {
Ok(_) => {
span_recorder.ok("success");
self.delete_success.record(delta)
}
Err(e) => {
span_recorder.error(e.to_string());
self.delete_error.record(delta)
}
};
}
res
}
}
#[cfg(test)]
@ -144,7 +91,6 @@ mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use data_types::TimestampRange;
use metric::Attributes;
use trace::{span::SpanStatus, RingBufferTraceCollector, TraceCollector};
@ -234,57 +180,4 @@ mod tests {
assert_metric_hit(&metrics, "dml_handler_write_duration", "error");
assert_trace(traces, SpanStatus::Err);
}
#[tokio::test]
async fn test_delete_ok() {
let ns = "platanos".try_into().unwrap();
let handler = Arc::new(MockDmlHandler::<()>::default().with_delete_return([Ok(())]));
let metrics = Arc::new(metric::Registry::default());
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span = SpanContext::new(Arc::clone(&traces));
let decorator = InstrumentationDecorator::new(HANDLER_NAME, &metrics, handler);
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![],
};
decorator
.delete(&ns, NamespaceId::new(42), "a table", &pred, Some(span))
.await
.expect("inner handler configured to succeed");
assert_metric_hit(&metrics, "dml_handler_delete_duration", "success");
assert_trace(traces, SpanStatus::Ok);
}
#[tokio::test]
async fn test_delete_err() {
let ns = "platanos".try_into().unwrap();
let handler = Arc::new(
MockDmlHandler::<()>::default()
.with_delete_return([Err(DmlError::NamespaceNotFound("nope".to_owned()))]),
);
let metrics = Arc::new(metric::Registry::default());
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span = SpanContext::new(Arc::clone(&traces));
let decorator = InstrumentationDecorator::new(HANDLER_NAME, &metrics, handler);
let pred = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![],
};
decorator
.delete(&ns, NamespaceId::new(42), "a table", &pred, Some(span))
.await
.expect_err("inner handler configured to fail");
assert_metric_hit(&metrics, "dml_handler_delete_duration", "error");
assert_trace(traces, SpanStatus::Err);
}
}

View File

@ -1,7 +1,7 @@
use std::{collections::VecDeque, fmt::Debug};
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName};
use parking_lot::Mutex;
use trace::ctx::SpanContext;
@ -16,19 +16,12 @@ pub enum MockDmlHandlerCall<W> {
namespace_id: NamespaceId,
write_input: W,
},
Delete {
namespace: String,
namespace_id: NamespaceId,
table: String,
predicate: DeletePredicate,
},
}
#[derive(Debug)]
struct Inner<W> {
calls: Vec<MockDmlHandlerCall<W>>,
write_return: VecDeque<Result<(), DmlError>>,
delete_return: VecDeque<Result<(), DmlError>>,
}
impl<W> Default for Inner<W> {
@ -36,7 +29,6 @@ impl<W> Default for Inner<W> {
Self {
calls: Default::default(),
write_return: Default::default(),
delete_return: Default::default(),
}
}
}
@ -65,11 +57,6 @@ where
self
}
pub fn with_delete_return(self, ret: impl Into<VecDeque<Result<(), DmlError>>>) -> Self {
self.0.lock().delete_return = ret.into();
self
}
pub fn calls(&self) -> Vec<MockDmlHandlerCall<W>> {
self.0.lock().calls.clone()
}
@ -93,7 +80,6 @@ where
W: Debug + Send + Sync,
{
type WriteError = DmlError;
type DeleteError = DmlError;
type WriteInput = W;
type WriteOutput = ();
@ -114,24 +100,4 @@ where
write_return
)
}
async fn delete(
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
table_name: &str,
predicate: &DeletePredicate,
_span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
record_and_return!(
self,
MockDmlHandlerCall::Delete {
namespace: namespace.into(),
namespace_id,
table: table_name.to_owned(),
predicate: predicate.clone(),
},
delete_return
)
}
}

View File

@ -3,7 +3,7 @@
use std::{fmt::Debug, marker::PhantomData};
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName};
use observability_deps::tracing::*;
use trace::ctx::SpanContext;
@ -25,7 +25,6 @@ where
T: Debug + Send + Sync,
{
type WriteError = DmlError;
type DeleteError = DmlError;
type WriteInput = T;
type WriteOutput = T;
@ -39,16 +38,4 @@ where
info!(%namespace, %namespace_id, ?batches, "dropping write operation");
Ok(batches)
}
async fn delete(
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
table_name: &str,
predicate: &DeletePredicate,
_span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
info!(%namespace, %namespace_id, %table_name, ?predicate, "dropping delete operation");
Ok(())
}
}

View File

@ -1,7 +1,5 @@
use async_trait::async_trait;
use data_types::{
DeletePredicate, NamespaceId, NamespaceName, PartitionKey, PartitionTemplate, TableId,
};
use data_types::{NamespaceId, NamespaceName, PartitionKey, PartitionTemplate, TableId};
use hashbrown::HashMap;
use mutable_batch::{MutableBatch, PartitionWrite, WritePayload};
use observability_deps::tracing::*;
@ -64,7 +62,6 @@ impl Partitioner {
#[async_trait]
impl DmlHandler for Partitioner {
type WriteError = PartitionError;
type DeleteError = PartitionError;
type WriteInput = HashMap<TableId, (String, MutableBatch)>;
type WriteOutput = Vec<Partitioned<Self::WriteInput>>;
@ -104,18 +101,6 @@ impl DmlHandler for Partitioner {
.map(|(key, batch)| Partitioned::new(key, batch))
.collect::<Vec<_>>())
}
/// Pass the delete request through unmodified to the next handler.
async fn delete(
&self,
_namespace: &NamespaceName<'static>,
_namespace_id: NamespaceId,
_table_name: &str,
_predicate: &DeletePredicate,
_span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
Ok(())
}
}
#[cfg(test)]

View File

@ -1,5 +1,5 @@
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName};
use hashbrown::HashMap;
use iox_time::{SystemProvider, TimeProvider};
use mutable_batch::MutableBatch;
@ -53,7 +53,6 @@ where
C: NamespaceCache<ReadError = iox_catalog::interface::Error>, // The handler expects the cache to read from the catalog if necessary.
{
type WriteError = RetentionError;
type DeleteError = RetentionError;
type WriteInput = HashMap<String, MutableBatch>;
type WriteOutput = Self::WriteInput;
@ -87,18 +86,6 @@ where
Ok(batch)
}
/// Pass the delete request through unmodified to the next handler.
async fn delete(
&self,
_namespace: &NamespaceName<'static>,
_namespace_id: NamespaceId,
_table_name: &str,
_predicate: &DeletePredicate,
_span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
Ok(())
}
}
#[cfg(test)]

View File

@ -16,7 +16,7 @@ use self::{
use super::{DmlHandler, Partitioned};
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName, TableId};
use data_types::{NamespaceId, NamespaceName, TableId};
use dml::{DmlMeta, DmlWrite};
use generated_types::influxdata::iox::ingester::v1::WriteRequest;
use hashbrown::HashMap;
@ -51,10 +51,6 @@ pub enum RpcWriteError {
#[error("upstream {0} is not connected")]
UpstreamNotConnected(String),
/// A delete request was rejected (not supported).
#[error("deletes are not supported")]
DeletesUnsupported,
/// The write request was not attempted, because not enough upstream
/// ingesters needed to satisfy the configured replication factor are
/// healthy.
@ -176,7 +172,6 @@ where
type WriteOutput = Vec<DmlMeta>;
type WriteError = RpcWriteError;
type DeleteError = RpcWriteError;
async fn write(
&self,
@ -265,24 +260,6 @@ where
Ok(vec![op.meta().clone()])
}
async fn delete(
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
table_name: &str,
_predicate: &DeletePredicate,
_span_ctx: Option<SpanContext>,
) -> Result<(), RpcWriteError> {
warn!(
%namespace,
%namespace_id,
%table_name,
"dropping delete request"
);
Err(RpcWriteError::DeletesUnsupported)
}
}
/// Perform an RPC write with `req` against one of the upstream ingesters in

View File

@ -195,7 +195,7 @@ mod tests {
async fn test_observe() {
let circuit_breaker = Arc::new(MockCircuitBreaker::default());
let mock_client = Arc::new(MockWriteClient::default().with_ret(Box::new(
[Ok(()), Err(RpcWriteError::DeletesUnsupported)].into_iter(),
[Ok(()), Err(RpcWriteError::NoUpstreams)].into_iter(),
)));
let wrapper = CircuitBreakingClient::new(Arc::clone(&mock_client), "bananas")
.with_circuit_breaker(Arc::clone(&circuit_breaker));

View File

@ -128,7 +128,6 @@ fn is_envoy_unavailable_error(e: &RpcWriteError) -> bool {
| RpcWriteError::Timeout(_)
| RpcWriteError::NoUpstreams
| RpcWriteError::UpstreamNotConnected(_)
| RpcWriteError::DeletesUnsupported
| RpcWriteError::PartialWrite { .. }
| RpcWriteError::NotEnoughReplicas => false,
}

View File

@ -1,7 +1,7 @@
use std::{ops::DerefMut, sync::Arc};
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName, NamespaceSchema, TableId};
use data_types::{NamespaceId, NamespaceName, NamespaceSchema, TableId};
use hashbrown::HashMap;
use iox_catalog::{
interface::{Catalog, Error as CatalogError},
@ -145,7 +145,6 @@ where
C: NamespaceCache<ReadError = iox_catalog::interface::Error>, // The handler expects the cache to read from the catalog if necessary.
{
type WriteError = SchemaError;
type DeleteError = SchemaError;
// Accepts a map of TableName -> MutableBatch
type WriteInput = HashMap<String, MutableBatch>;
@ -315,19 +314,6 @@ where
Ok(batches)
}
/// This call is passed through to `D` - no schema validation is performed
/// on deletes.
async fn delete(
&self,
_namespace: &NamespaceName<'static>,
_namespace_id: NamespaceId,
_table_name: &str,
_predicate: &DeletePredicate,
_span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
Ok(())
}
}
/// An error returned by schema limit evaluation against a cached
@ -461,7 +447,7 @@ mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use data_types::{ColumnType, TimestampRange};
use data_types::ColumnType;
use iox_tests::{TestCatalog, TestNamespace};
use once_cell::sync::Lazy;
@ -956,29 +942,4 @@ mod tests {
assert_matches!(err, SchemaError::ServiceLimit(_));
assert_eq!(1, handler.service_limit_hit_columns.fetch());
}
#[tokio::test]
async fn test_write_delete_passthrough_ok() {
const NAMESPACE: &str = "NAMESPACE_IS_NOT_VALIDATED";
const TABLE: &str = "bananas";
let (catalog, _namespace) = test_setup().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics);
let predicate = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![],
};
let ns = NamespaceName::try_from(NAMESPACE).unwrap();
handler
.delete(&ns, NamespaceId::new(42), TABLE, &predicate, None)
.await
.expect("request should succeed");
// Deletes have no effect on the cache.
assert!(handler.cache.get_schema(&ns).await.is_err());
}
}

View File

@ -2,7 +2,7 @@ use super::{
partitioner::PartitionError, retention_validation::RetentionError, RpcWriteError, SchemaError,
};
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName};
use std::{error::Error, fmt::Debug, sync::Arc};
use thiserror::Error;
use trace::ctx::SpanContext;
@ -57,9 +57,6 @@ pub trait DmlHandler: Debug + Send + Sync {
/// All errors must be mappable into the concrete [`DmlError`] type.
type WriteError: Error + Into<DmlError> + Send;
/// The error type of the delete handler.
type DeleteError: Error + Into<DmlError> + Send;
/// Write `batches` to `namespace`.
async fn write(
&self,
@ -68,16 +65,6 @@ pub trait DmlHandler: Debug + Send + Sync {
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError>;
/// Delete the data specified in `delete`.
async fn delete(
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
table_name: &str,
predicate: &DeletePredicate,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError>;
}
#[async_trait]
@ -88,7 +75,6 @@ where
type WriteInput = T::WriteInput;
type WriteOutput = T::WriteOutput;
type WriteError = T::WriteError;
type DeleteError = T::DeleteError;
async fn write(
&self,
@ -101,18 +87,4 @@ where
.write(namespace, namespace_id, input, span_ctx)
.await
}
/// Delete the data specified in `delete`.
async fn delete(
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
table_name: &str,
predicate: &DeletePredicate,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
(**self)
.delete(namespace, namespace_id, table_name, predicate, span_ctx)
.await
}
}

View File

@ -1,6 +1,5 @@
//! HTTP service implementations for `router`.
mod delete_predicate;
pub mod write;
use authz::{Action, Authorizer, Permission, Resource};
@ -13,19 +12,15 @@ use metric::{DurationHistogram, U64Counter};
use mutable_batch::MutableBatch;
use mutable_batch_lp::LinesConverter;
use observability_deps::tracing::*;
use predicate::delete_predicate::parse_delete_predicate;
use server_util::authorization::AuthorizationHeaderExtension;
use std::{str::Utf8Error, sync::Arc, time::Instant};
use thiserror::Error;
use tokio::sync::{Semaphore, TryAcquireError};
use trace::ctx::SpanContext;
use self::{
delete_predicate::parse_http_delete_request,
write::{
multi_tenant::MultiTenantExtractError, single_tenant::SingleTenantExtractError,
WriteParamExtractor, WriteParams,
},
use self::write::{
multi_tenant::MultiTenantExtractError, single_tenant::SingleTenantExtractError,
WriteParamExtractor, WriteParams,
};
use crate::{
dml_handlers::{
@ -41,6 +36,10 @@ pub enum Error {
#[error("not found")]
NoHandler,
/// A delete request was rejected (not supported).
#[error("deletes are not supported")]
DeletesUnsupported,
/// An error parsing a single-tenant HTTP request.
#[error(transparent)]
SingleTenantError(#[from] SingleTenantExtractError),
@ -77,14 +76,6 @@ pub enum Error {
#[error("failed to parse line protocol: {0}")]
ParseLineProtocol(mutable_batch_lp::Error),
/// Failure to parse the request delete predicate.
#[error("failed to parse delete predicate: {0}")]
ParseDelete(#[from] predicate::delete_predicate::Error),
/// Failure to parse the delete predicate in the http request
#[error("failed to parse delete predicate from http request: {0}")]
ParseHttpDelete(#[from] self::delete_predicate::Error),
/// An error returned from the [`DmlHandler`].
#[error("dml handler error: {0}")]
DmlHandler(#[from] DmlError),
@ -120,13 +111,12 @@ impl Error {
pub fn as_status_code(&self) -> StatusCode {
match self {
Error::NoHandler => StatusCode::NOT_FOUND,
Error::DeletesUnsupported => StatusCode::NOT_IMPLEMENTED,
Error::ClientHangup(_) => StatusCode::BAD_REQUEST,
Error::InvalidGzip(_) => StatusCode::BAD_REQUEST,
Error::NonUtf8ContentHeader(_) => StatusCode::BAD_REQUEST,
Error::NonUtf8Body(_) => StatusCode::BAD_REQUEST,
Error::ParseLineProtocol(_) => StatusCode::BAD_REQUEST,
Error::ParseDelete(_) => StatusCode::BAD_REQUEST,
Error::ParseHttpDelete(_) => StatusCode::BAD_REQUEST,
Error::RequestSizeExceeded(_) => StatusCode::PAYLOAD_TOO_LARGE,
Error::InvalidContentEncoding(_) => {
// https://www.rfc-editor.org/rfc/rfc7231#section-6.5.13
@ -185,7 +175,6 @@ impl From<&DmlError> for StatusCode {
}
DmlError::Retention(RetentionError::OutsideRetention(_)) => StatusCode::FORBIDDEN,
DmlError::RpcWrite(RpcWriteError::Upstream(_)) => StatusCode::INTERNAL_SERVER_ERROR,
DmlError::RpcWrite(RpcWriteError::DeletesUnsupported) => StatusCode::NOT_IMPLEMENTED,
DmlError::RpcWrite(RpcWriteError::Timeout(_)) => StatusCode::GATEWAY_TIMEOUT,
DmlError::RpcWrite(
RpcWriteError::NoUpstreams
@ -226,7 +215,6 @@ pub struct HttpDelegate<D, N, T = SystemProvider> {
write_metric_fields: U64Counter,
write_metric_tables: U64Counter,
write_metric_body_size: U64Counter,
delete_metric_body_size: U64Counter,
request_limit_rejected: U64Counter,
}
@ -269,12 +257,6 @@ impl<D, N> HttpDelegate<D, N, SystemProvider> {
"cumulative byte size of successfully routed (decompressed) line protocol write requests",
)
.recorder(&[]);
let delete_metric_body_size = metrics
.register_metric::<U64Counter>(
"http_delete_body_bytes",
"cumulative byte size of successfully routed (decompressed) delete requests",
)
.recorder(&[]);
let request_limit_rejected = metrics
.register_metric::<U64Counter>(
"http_request_limit_rejected",
@ -301,7 +283,6 @@ impl<D, N> HttpDelegate<D, N, SystemProvider> {
write_metric_fields,
write_metric_tables,
write_metric_body_size,
delete_metric_body_size,
request_limit_rejected,
}
}
@ -343,7 +324,7 @@ where
let dml_info = self.write_param_extractor.parse_v2(&req)?;
self.write_handler(req, dml_info).await
}
(&Method::POST, "/api/v2/delete") => self.delete_handler(req).await,
(&Method::POST, "/api/v2/delete") => return Err(Error::DeletesUnsupported),
_ => return Err(Error::NoHandler),
}
.map(|_summary| {
@ -440,55 +421,6 @@ where
Ok(())
}
async fn delete_handler(&self, req: Request<Body>) -> Result<(), Error> {
let span_ctx: Option<SpanContext> = req.extensions().get().cloned();
let write_info = self.write_param_extractor.parse_v2(&req)?;
trace!(namespace=%write_info.namespace, "processing delete request");
// Read the HTTP body and convert it to a str.
let body = self.read_body(req).await?;
let body = std::str::from_utf8(&body).map_err(Error::NonUtf8Body)?;
// Parse and extract table name (which can be empty), start, stop, and predicate
let parsed_delete = parse_http_delete_request(body)?;
let predicate = parse_delete_predicate(
&parsed_delete.start_time,
&parsed_delete.stop_time,
&parsed_delete.predicate,
)?;
debug!(
table_name=%parsed_delete.table_name,
predicate = %parsed_delete.predicate,
start=%parsed_delete.start_time,
stop=%parsed_delete.stop_time,
body_size=body.len(),
namespace=%write_info.namespace,
"routing delete"
);
let namespace_id = self
.namespace_resolver
.get_namespace_id(&write_info.namespace)
.await?;
self.dml_handler
.delete(
&write_info.namespace,
namespace_id,
parsed_delete.table_name.as_str(),
&predicate,
span_ctx,
)
.await
.map_err(Into::into)?;
self.delete_metric_body_size.inc(body.len() as _);
Ok(())
}
/// Parse the request's body into raw bytes, applying the configured size
/// limits and decoding any content encoding.
async fn read_body(&self, req: hyper::Request<Body>) -> Result<Bytes, Error> {
@ -687,7 +619,6 @@ mod tests {
.with_mapping(NAMESPACE_NAME, NAMESPACE_ID);
let dml_handler = Arc::new(MockDmlHandler::default()
.with_write_return($dml_write_handler)
.with_delete_return($dml_delete_handler)
);
let metrics = Arc::new(metric::Registry::default());
let delegate = HttpDelegate::new(
@ -773,30 +704,6 @@ mod tests {
};
}
// Wrapper over test_http_handler specifically for delete requests.
macro_rules! test_delete_handler {
(
$name:ident,
query_string = $query_string:expr, // Request URI query string
body = $body:expr, // Request body content
dml_handler = $dml_handler:expr, // DML delete handler response (if called)
want_result = $want_result:pat,
want_dml_calls = $($want_dml_calls:tt )+
) => {
paste::paste! {
test_http_handler!(
[<delete_ $name>],
uri = format!("https://bananas.example/api/v2/delete{}", $query_string),
body = $body,
dml_write_handler = [],
dml_delete_handler = $dml_handler,
want_result = $want_result,
want_dml_calls = $($want_dml_calls)+
);
}
};
}
test_write_handler!(
ok,
query_string = "?org=bananas&bucket=test",
@ -1029,118 +936,6 @@ mod tests {
want_dml_calls = []
);
test_delete_handler!(
ok,
query_string = "?org=bananas&bucket=test",
body = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=its_a_table and location=Boston"}"#.as_bytes(),
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Delete{namespace, namespace_id, table, predicate}] => {
assert_eq!(table, "its_a_table");
assert_eq!(namespace, NAMESPACE_NAME);
assert_eq!(*namespace_id, NAMESPACE_ID);
assert!(!predicate.exprs.is_empty());
}
);
test_delete_handler!(
invalid_delete_body,
query_string = "?org=bananas&bucket=test",
body = r#"{wat}"#.as_bytes(),
dml_handler = [],
want_result = Err(Error::ParseHttpDelete(_)),
want_dml_calls = []
);
test_delete_handler!(
no_query_params,
query_string = "",
body = "".as_bytes(),
dml_handler = [Ok(())],
want_result = Err(Error::MultiTenantError(
MultiTenantExtractError::ParseV2Request(V2WriteParseError::NoQueryParams)
)),
want_dml_calls = [] // None
);
test_delete_handler!(
no_org_bucket,
query_string = "?",
body = "".as_bytes(),
dml_handler = [Ok(())],
want_result = Err(Error::MultiTenantError(
MultiTenantExtractError::InvalidOrgAndBucket(
OrgBucketMappingError::NoOrgBucketSpecified
)
)),
want_dml_calls = [] // None
);
test_delete_handler!(
empty_org_bucket,
query_string = "?org=&bucket=",
body = "".as_bytes(),
dml_handler = [Ok(())],
want_result = Err(Error::MultiTenantError(
MultiTenantExtractError::InvalidOrgAndBucket(
OrgBucketMappingError::NoOrgBucketSpecified
)
)),
want_dml_calls = [] // None
);
test_delete_handler!(
invalid_org_bucket,
query_string = format!("?org=test&bucket={}", "A".repeat(1000)),
body = "".as_bytes(),
dml_handler = [Ok(())],
want_result = Err(Error::MultiTenantError(
MultiTenantExtractError::InvalidOrgAndBucket(
OrgBucketMappingError::InvalidNamespaceName(
NamespaceNameError::LengthConstraint { .. }
)
)
)),
want_dml_calls = [] // None
);
test_delete_handler!(
non_utf8_body,
query_string = "?org=bananas&bucket=test",
body = vec![0xc3, 0x28],
dml_handler = [Ok(())],
want_result = Err(Error::NonUtf8Body(_)),
want_dml_calls = [] // None
);
test_delete_handler!(
db_not_found,
query_string = "?org=bananas&bucket=test",
body = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=its_a_table and location=Boston"}"#.as_bytes(),
dml_handler = [Err(DmlError::NamespaceNotFound(NAMESPACE_NAME.to_string()))],
want_result = Err(Error::DmlHandler(DmlError::NamespaceNotFound(_))),
want_dml_calls = [MockDmlHandlerCall::Delete{namespace, namespace_id, table, predicate}] => {
assert_eq!(table, "its_a_table");
assert_eq!(namespace, NAMESPACE_NAME);
assert_eq!(*namespace_id, NAMESPACE_ID);
assert!(!predicate.exprs.is_empty());
}
);
test_delete_handler!(
dml_handler_error,
query_string = "?org=bananas&bucket=test",
body = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=its_a_table and location=Boston"}"#.as_bytes(),
dml_handler = [Err(DmlError::Internal("💣".into()))],
want_result = Err(Error::DmlHandler(DmlError::Internal(_))),
want_dml_calls = [MockDmlHandlerCall::Delete{namespace, namespace_id, table, predicate}] => {
assert_eq!(table, "its_a_table");
assert_eq!(namespace, NAMESPACE_NAME);
assert_eq!(*namespace_id, NAMESPACE_ID);
assert!(!predicate.exprs.is_empty());
}
);
test_http_handler!(
not_found,
uri = "https://bananas.example/wat",
@ -1429,11 +1224,7 @@ mod tests {
let mock_namespace_resolver =
MockNamespaceResolver::default().with_mapping(NAMESPACE_NAME, NamespaceId::new(42));
let dml_handler = Arc::new(
MockDmlHandler::default()
.with_write_return([Ok(())])
.with_delete_return([]),
);
let dml_handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let metrics = Arc::new(metric::Registry::default());
let authz = Arc::new(MockAuthorizer {});
let delegate = HttpDelegate::new(
@ -1511,11 +1302,7 @@ mod tests {
let mock_namespace_resolver =
MockNamespaceResolver::default().with_mapping(NAMESPACE_NAME, NamespaceId::new(42));
let dml_handler = Arc::new(
MockDmlHandler::default()
.with_write_return([Ok(())])
.with_delete_return([]),
);
let dml_handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let metrics = Arc::new(metric::Registry::default());
let delegate = HttpDelegate::new(
MAX_BYTES,
@ -1560,11 +1347,8 @@ mod tests {
}),
));
let dml_handler = Arc::new(
MockDmlHandler::default()
.with_write_return([Ok(()), Ok(()), Ok(())])
.with_delete_return([]),
);
let dml_handler =
Arc::new(MockDmlHandler::default().with_write_return([Ok(()), Ok(()), Ok(())]));
let metrics = Arc::new(metric::Registry::default());
let delegate = HttpDelegate::new(
MAX_BYTES,
@ -1611,24 +1395,6 @@ mod tests {
request_parser.calls().as_slice(),
[MockExtractorCall::V1, MockExtractorCall::V2]
);
// Delete requests hit the v2 parser
let request = Request::builder()
.uri("https://bananas.example/api/v2/delete")
.method("POST")
.body(Body::from(""))
.unwrap();
let _got = delegate.route(request).await;
// The delete should have hit v2.
assert_matches!(
request_parser.calls().as_slice(),
[
MockExtractorCall::V1,
MockExtractorCall::V2,
MockExtractorCall::V2
]
);
}
// The display text of Error gets passed through `ioxd_router::IoxHttpErrorAdaptor` then
@ -1682,6 +1448,11 @@ mod tests {
"not found",
),
(
DeletesUnsupported,
"deletes are not supported",
),
(
NonUtf8Body(std::str::from_utf8(&[0, 159]).unwrap_err()),
"body content is not valid utf8: invalid utf-8 sequence of 1 bytes from index 1",
@ -1760,21 +1531,6 @@ mod tests {
"failed to parse line protocol: timestamp overflows i64",
),
(
ParseDelete({
predicate::delete_predicate::Error::InvalidSyntax { value: "[syntax]".into() }
}),
"failed to parse delete predicate: Invalid predicate syntax: ([syntax])",
),
(
ParseHttpDelete({
delete_predicate::Error::TableInvalid { value: "[table name]".into() }
}),
"failed to parse delete predicate from http request: \
Invalid table name in delete '[table name]'"
),
(
DmlHandler(DmlError::NamespaceNotFound("[namespace name]".into())),
"dml handler error: namespace [namespace name] does not exist",

View File

@ -1,231 +0,0 @@
use snafu::{ResultExt, Snafu};
/// Parse Delete Predicates
/// Parse Error
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(r#"Unable to parse delete string '{}'"#, value))]
Invalid {
source: serde_json::Error,
value: String,
},
#[snafu(display(
r#"Invalid key which is either 'start', 'stop', or 'predicate': '{}'"#,
value
))]
KeywordInvalid { value: String },
#[snafu(display(r#"Invalid timestamp or predicate value: '{}'"#, value))]
ValueInvalid { value: String },
#[snafu(display(r#"Invalid JSON format of delete string '{}'"#, value))]
ObjectInvalid { value: String },
#[snafu(display(r#"Invalid table name in delete '{}'"#, value))]
TableInvalid { value: String },
#[snafu(display(r#"Delete must include a start time and a stop time'{}'"#, value))]
StartStopInvalid { value: String },
}
/// Result type for Parser Cient
pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;
const FLUX_TABLE: &str = "_measurement";
/// Data of a parsed delete
///
/// Note that this struct and its functions are used to parse FLUX DELETE,
/// <https://docs.influxdata.com/influxdb/v2.0/write-data/delete-data/>, which happens before
/// the parsing of timestamps and sql predicate. The examples below will show FLUX DELETE's syntax which is
/// different from SQL syntax so we need this extra parsing step before invoking sqlparser to parse the
/// sql-format predicates and timestamps
///
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub(crate) struct HttpDeleteRequest {
/// Empty string, "", if no table specified
pub(crate) table_name: String,
pub(crate) start_time: String,
pub(crate) stop_time: String,
pub(crate) predicate: String,
}
/// Return parsed data of an influx delete:
/// A few input examples and their parsed results:
/// {"predicate":"_measurement=mytable AND host=\"Orient.local\"","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}
/// => table_name="mytable", start_time="1970-01-01T00:00:00Z", end_time="2070-01-02T00:00:00Z", predicate="host=\"Orient.local\"""
/// {"predicate":"host=Orient.local and val != 50","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}
/// => start_time="1970-01-01T00:00:00Z", end_time="2070-01-02T00:00:00Z", predicate="host=Orient.local and val != 50"
///
pub(crate) fn parse_http_delete_request(input: &str) -> Result<HttpDeleteRequest> {
let parsed_obj: serde_json::Value =
serde_json::from_str(input).context(InvalidSnafu { value: input })?;
let mut parsed_delete = HttpDeleteRequest::default();
if let serde_json::Value::Object(items) = parsed_obj {
for item in items {
// The value must be type String
if let Some(val) = item.1.as_str() {
match item.0.to_lowercase().as_str() {
"start" => parsed_delete.start_time = val.to_string(),
"stop" => parsed_delete.stop_time = val.to_string(),
"predicate" => parsed_delete.predicate = val.to_string(),
_ => {
return Err(Error::KeywordInvalid {
value: input.to_string(),
})
}
}
} else {
return Err(Error::ValueInvalid {
value: input.to_string(),
});
}
}
} else {
return Err(Error::ObjectInvalid {
value: input.to_string(),
});
}
// Start or stop is empty
if parsed_delete.start_time.is_empty() || parsed_delete.stop_time.is_empty() {
return Err(Error::StartStopInvalid {
value: input.to_string(),
});
}
// Extract table from the predicate if any
if parsed_delete.predicate.contains(FLUX_TABLE) {
// since predicate is a conjunctive expression, split them by "and"
let predicate = parsed_delete
.predicate
.replace(" AND ", " and ")
.replace(" ANd ", " and ")
.replace(" And ", " and ")
.replace(" AnD ", " and ");
let split: Vec<&str> = predicate.split("and").collect();
let mut predicate_no_table = "".to_string();
for s in split {
if s.contains(FLUX_TABLE) {
// This should be in form "_measurement = <your_table_name>"
// only <keep your_table_name> by replacing the rest with ""
let table_name = s
.replace(FLUX_TABLE, "")
.replace('=', "")
.trim()
.to_string();
// Do not support white spaces in table name
if table_name.contains(' ') {
return Err(Error::TableInvalid {
value: input.to_string(),
});
}
parsed_delete.table_name = table_name;
} else {
// This is a normal column comparison, put it back to send to sqlparser later
if !predicate_no_table.is_empty() {
predicate_no_table.push_str(" and ")
}
predicate_no_table.push_str(s.trim());
}
}
parsed_delete.predicate = predicate_no_table;
}
Ok(parsed_delete)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_http_delete_full() {
let delete_str = r#"{"predicate":"_measurement=mytable AND host=\"Orient.local\"","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
let expected = HttpDeleteRequest {
table_name: "mytable".to_string(),
predicate: "host=\"Orient.local\"".to_string(),
start_time: "1970-01-01T00:00:00Z".to_string(),
stop_time: "2070-01-02T00:00:00Z".to_string(),
};
let result = parse_http_delete_request(delete_str).unwrap();
assert_eq!(expected, result);
}
#[test]
fn test_parse_http_delete_no_table() {
let delete_str = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z", "predicate":"host=\"Orient.local\""}"#;
let expected = HttpDeleteRequest {
table_name: "".to_string(),
predicate: "host=\"Orient.local\"".to_string(),
start_time: "1970-01-01T00:00:00Z".to_string(),
stop_time: "2070-01-02T00:00:00Z".to_string(),
};
let result = parse_http_delete_request(delete_str).unwrap();
assert_eq!(expected, result);
}
#[test]
fn test_parse_http_delete_empty_predicate() {
let delete_str =
r#"{"start":"1970-01-01T00:00:00Z","predicate":"","stop":"2070-01-02T00:00:00Z"}"#;
let expected = HttpDeleteRequest {
table_name: "".to_string(),
predicate: "".to_string(),
start_time: "1970-01-01T00:00:00Z".to_string(),
stop_time: "2070-01-02T00:00:00Z".to_string(),
};
let result = parse_http_delete_request(delete_str).unwrap();
assert_eq!(expected, result);
}
#[test]
fn test_parse_http_delete_no_predicate() {
let delete_str = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
let expected = HttpDeleteRequest {
table_name: "".to_string(),
predicate: "".to_string(),
start_time: "1970-01-01T00:00:00Z".to_string(),
stop_time: "2070-01-02T00:00:00Z".to_string(),
};
let result = parse_http_delete_request(delete_str).unwrap();
assert_eq!(expected, result);
}
#[test]
fn test_parse_http_delete_negative() {
// invalid key
let delete_str = r#"{"invalid":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
let result = parse_http_delete_request(delete_str);
let err = result.unwrap_err();
assert!(err
.to_string()
.contains("Invalid key which is either 'start', 'stop', or 'predicate'"));
// invalid timestamp value
let delete_str = r#"{"start":123,"stop":"2070-01-02T00:00:00Z"}"#;
let result = parse_http_delete_request(delete_str);
let err = result.unwrap_err();
assert!(err
.to_string()
.contains("Invalid timestamp or predicate value"));
// invalid JSON
let delete_str = r#"{"start":"1970-01-01T00:00:00Z",;"stop":"2070-01-02T00:00:00Z"}"#;
let result = parse_http_delete_request(delete_str);
let err = result.unwrap_err();
assert!(err.to_string().contains("Unable to parse delete string"));
}
}

View File

@ -8,7 +8,7 @@ use hyper::{Body, Request, StatusCode};
use iox_catalog::interface::SoftDeletedRows;
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, DurationHistogram, Metric, U64Counter};
use router::dml_handlers::{DmlError, RetentionError, RpcWriteError, SchemaError};
use router::dml_handlers::{DmlError, RetentionError, SchemaError};
use std::sync::Arc;
pub mod common;
@ -384,14 +384,10 @@ async fn test_delete_unsupported() {
assert_matches!(
&err,
e @ router::server::http::Error::DmlHandler(
DmlError::RpcWrite(
RpcWriteError::DeletesUnsupported
)
) => {
e @ router::server::http::Error::DeletesUnsupported => {
assert_eq!(
e.to_string(),
"dml handler error: deletes are not supported"
"deletes are not supported"
);
}
);