Merge pull request #3217 from influxdata/crepererum/disallow_unsequenced_deletes

feat: disallow unsequenced deletes when reading from write buffer
pull/24376/head
kodiakhq[bot] 2021-11-26 10:51:44 +00:00 committed by GitHub
commit 00143315ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 59 additions and 25 deletions

View File

@ -14,7 +14,7 @@
use data_types::{names::OrgBucketMappingError, DatabaseName};
use influxdb_iox_client::format::QueryOutputFormat;
use query::exec::ExecutionContextProvider;
use server::Error;
use server::{database::WriteError, Error};
// External crates
use async_trait::async_trait;
@ -193,23 +193,31 @@ impl HttpDrivenDml for DatabaseServerType {
})
}
DmlOperation::Delete(delete) => {
let db = self
.server
.db(db_name)
.map_err(|_| InnerDmlError::DatabaseNotFound {
let database = self.server.active_database(db_name).map_err(|_| {
InnerDmlError::DatabaseNotFound {
db_name: db_name.to_string(),
}
})?;
db.store_delete(&delete).map_err(|e| match e {
database
.route_operation(&DmlOperation::Delete(delete))
.await
.map_err(|e| match e {
WriteError::DbError { source } => match source {
server::db::Error::DeleteFromTable { table_name, .. } => {
InnerDmlError::TableNotFound {
db_name: db_name.to_string(),
table_name,
}
}
_ => InnerDmlError::InternalError {
db_name: db_name.to_string(),
source: Box::new(source),
},
},
e => InnerDmlError::InternalError {
db_name: db_name.to_string(),
source: Box::new(e),
source: Box::new(dbg!(e)),
},
})?;

View File

@ -2,7 +2,7 @@ use std::sync::Arc;
use data_types::non_empty::NonEmptyString;
use data_types::DatabaseName;
use dml::{DmlDelete, DmlMeta};
use dml::{DmlDelete, DmlMeta, DmlOperation};
use generated_types::google::{FieldViolationExt, FromOptionalField, OptionalField};
use generated_types::influxdata::iox::delete::v1::*;
use server::Server;
@ -12,7 +12,7 @@ struct DeleteService {
server: Arc<Server>,
}
use super::error::{default_db_error_handler, default_server_error_handler};
use super::error::{default_database_write_error_handler, default_server_error_handler};
#[tonic::async_trait]
impl delete_service_server::DeleteService for DeleteService {
@ -35,12 +35,15 @@ impl delete_service_server::DeleteService for DeleteService {
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let db = self
let database = self
.server
.db(&db_name)
.active_database(&db_name)
.map_err(default_server_error_handler)?;
db.store_delete(&delete).map_err(default_db_error_handler)?;
database
.route_operation(&DmlOperation::Delete(delete))
.await
.map_err(default_database_write_error_handler)?;
Ok(Response::new(DeleteResponse {}))
}

View File

@ -10,6 +10,10 @@ use generated_types::influxdata::iox::write_buffer::v1::{
write_buffer_connection::Direction as WriteBufferDirection, WriteBufferConnection,
};
use influxdb_iox_client::{
delete::{
generated_types::{Predicate, TimestampRange},
DeleteError,
},
management::{generated_types::WriteBufferCreationConfig, CreateDatabaseError},
write::WriteError,
};
@ -153,6 +157,23 @@ async fn cant_write_to_db_reading_from_write_buffer() {
assert_contains!(err.to_string(), "only allowed through write buffer");
assert!(matches!(dbg!(err), WriteError::ServerError(_)));
// Deleting from this database is an error; all data comes from write buffer
let mut delete_client = server.delete_client();
let err = delete_client
.delete(
&db_name,
"temp",
Predicate {
range: Some(TimestampRange { start: 1, end: 2 }),
exprs: vec![],
},
)
.await
.expect_err("expected delete to fail");
assert_contains!(err.to_string(), "only allowed through write buffer");
assert!(matches!(dbg!(err), DeleteError::ServerError(_)));
}
#[tokio::test]

View File

@ -521,12 +521,12 @@ impl Db {
}
/// Store a delete
pub fn store_delete(&self, delete: &DmlDelete) -> Result<()> {
pub(crate) fn store_delete(&self, delete: &DmlDelete) -> Result<()> {
self.store_filtered_delete(delete, DeleteFilterNone::default())
}
/// Store a delete with the provided [`DeleteFilter`]
pub fn store_filtered_delete(
pub(crate) fn store_filtered_delete(
&self,
delete: &DmlDelete,
filter: impl DeleteFilter,
@ -549,6 +549,8 @@ impl Db {
/// Delete data from a table on a specified predicate
///
/// Returns an error if the table cannot be found in the catalog
///
/// **WARNING: Only use that when no write buffer is used.**
pub fn delete(&self, table_name: &str, delete_predicate: Arc<DeletePredicate>) -> Result<()> {
self.delete_filtered(table_name, delete_predicate, DeleteFilterNone::default())
}
@ -1028,7 +1030,7 @@ impl Db {
}
/// Writes the provided [`DmlWrite`] to this database with the provided [`WriteFilter`]
pub(crate) fn store_filtered_write(
pub fn store_filtered_write(
&self,
db_write: &DmlWrite,
filter: impl WriteFilter,