diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs index 163b4b6e17..e8adb85196 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs @@ -14,7 +14,7 @@ use data_types::{names::OrgBucketMappingError, DatabaseName}; use influxdb_iox_client::format::QueryOutputFormat; use query::exec::ExecutionContextProvider; -use server::Error; +use server::{database::WriteError, Error}; // External crates use async_trait::async_trait; @@ -193,26 +193,34 @@ impl HttpDrivenDml for DatabaseServerType { }) } DmlOperation::Delete(delete) => { - let db = self - .server - .db(db_name) - .map_err(|_| InnerDmlError::DatabaseNotFound { + let database = self.server.active_database(db_name).map_err(|_| { + InnerDmlError::DatabaseNotFound { db_name: db_name.to_string(), - })?; - - db.store_delete(&delete).map_err(|e| match e { - server::db::Error::DeleteFromTable { table_name, .. } => { - InnerDmlError::TableNotFound { - db_name: db_name.to_string(), - table_name, - } } - e => InnerDmlError::InternalError { - db_name: db_name.to_string(), - source: Box::new(e), - }, })?; + database + .route_operation(&DmlOperation::Delete(delete)) + .await + .map_err(|e| match e { + WriteError::DbError { source } => match source { + server::db::Error::DeleteFromTable { table_name, .. } => { + InnerDmlError::TableNotFound { + db_name: db_name.to_string(), + table_name, + } + } + _ => InnerDmlError::InternalError { + db_name: db_name.to_string(), + source: Box::new(source), + }, + }, + e => InnerDmlError::InternalError { + db_name: db_name.to_string(), + source: Box::new(dbg!(e)), + }, + })?; + Ok(()) } } diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/delete.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/delete.rs index 85133af32e..661898f20b 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/delete.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/delete.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use data_types::non_empty::NonEmptyString; use data_types::DatabaseName; -use dml::{DmlDelete, DmlMeta}; +use dml::{DmlDelete, DmlMeta, DmlOperation}; use generated_types::google::{FieldViolationExt, FromOptionalField, OptionalField}; use generated_types::influxdata::iox::delete::v1::*; use server::Server; @@ -12,7 +12,7 @@ struct DeleteService { server: Arc, } -use super::error::{default_db_error_handler, default_server_error_handler}; +use super::error::{default_database_write_error_handler, default_server_error_handler}; #[tonic::async_trait] impl delete_service_server::DeleteService for DeleteService { @@ -35,12 +35,15 @@ impl delete_service_server::DeleteService for DeleteService { // Validate that the database name is legit let db_name = DatabaseName::new(db_name).scope("db_name")?; - let db = self + let database = self .server - .db(&db_name) + .active_database(&db_name) .map_err(default_server_error_handler)?; - db.store_delete(&delete).map_err(default_db_error_handler)?; + database + .route_operation(&DmlOperation::Delete(delete)) + .await + .map_err(default_database_write_error_handler)?; Ok(Response::new(DeleteResponse {})) } diff --git a/influxdb_iox/tests/end_to_end_cases/write_buffer.rs b/influxdb_iox/tests/end_to_end_cases/write_buffer.rs index a3ba682cc7..419dc75b48 100644 --- a/influxdb_iox/tests/end_to_end_cases/write_buffer.rs +++ b/influxdb_iox/tests/end_to_end_cases/write_buffer.rs @@ -10,6 +10,10 @@ use generated_types::influxdata::iox::write_buffer::v1::{ write_buffer_connection::Direction as WriteBufferDirection, WriteBufferConnection, }; use influxdb_iox_client::{ + delete::{ + generated_types::{Predicate, TimestampRange}, + DeleteError, + }, management::{generated_types::WriteBufferCreationConfig, CreateDatabaseError}, write::WriteError, }; @@ -153,6 +157,23 @@ async fn cant_write_to_db_reading_from_write_buffer() { assert_contains!(err.to_string(), "only allowed through write buffer"); assert!(matches!(dbg!(err), WriteError::ServerError(_))); + + // Deleting from this database is an error; all data comes from write buffer + let mut delete_client = server.delete_client(); + let err = delete_client + .delete( + &db_name, + "temp", + Predicate { + range: Some(TimestampRange { start: 1, end: 2 }), + exprs: vec![], + }, + ) + .await + .expect_err("expected delete to fail"); + + assert_contains!(err.to_string(), "only allowed through write buffer"); + assert!(matches!(dbg!(err), DeleteError::ServerError(_))); } #[tokio::test] diff --git a/server/src/db.rs b/server/src/db.rs index d899a19fa4..dc3fce9ee9 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -521,12 +521,12 @@ impl Db { } /// Store a delete - pub fn store_delete(&self, delete: &DmlDelete) -> Result<()> { + pub(crate) fn store_delete(&self, delete: &DmlDelete) -> Result<()> { self.store_filtered_delete(delete, DeleteFilterNone::default()) } /// Store a delete with the provided [`DeleteFilter`] - pub fn store_filtered_delete( + pub(crate) fn store_filtered_delete( &self, delete: &DmlDelete, filter: impl DeleteFilter, @@ -549,6 +549,8 @@ impl Db { /// Delete data from a table on a specified predicate /// /// Returns an error if the table cannot be found in the catalog + /// + /// **WARNING: Only use that when no write buffer is used.** pub fn delete(&self, table_name: &str, delete_predicate: Arc) -> Result<()> { self.delete_filtered(table_name, delete_predicate, DeleteFilterNone::default()) } @@ -1028,7 +1030,7 @@ impl Db { } /// Writes the provided [`DmlWrite`] to this database with the provided [`WriteFilter`] - pub(crate) fn store_filtered_write( + pub fn store_filtered_write( &self, db_write: &DmlWrite, filter: impl WriteFilter,