From a4183de4115cf608f8492d82f258229bd4f1e37e Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Tue, 31 Aug 2021 17:42:07 -0400 Subject: [PATCH] feat: more progress on the delete flow from grpc API to catalog chunks --- .../iox/management/v1/service.proto | 3 +- influxdb_iox_client/src/client/management.rs | 44 ++++++++++++++++--- influxdb_line_protocol/src/lib.rs | 32 +++++++++++++- query/src/lib.rs | 7 +++ query/src/predicate.rs | 1 - query/src/test.rs | 5 +++ server/src/db.rs | 32 ++++++++++---- server/src/db/catalog/chunk.rs | 32 +++++++++++++- server/src/db/chunk.rs | 5 +++ src/commands/database/partition.rs | 2 +- src/influxdb_ioxd/rpc/management.rs | 29 +++++++++--- tests/end_to_end_cases/management_api.rs | 4 +- 12 files changed, 168 insertions(+), 28 deletions(-) diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index 598a6084fc..6e358afa1d 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -448,6 +448,5 @@ message DeleteRequest { } message DeleteResponse { - // todo - // string delete_status = 1; + // NGA todo: response something? } diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index fc90163c44..9034319de1 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -360,7 +360,21 @@ pub enum DropPartitionError { /// Errors returned by [`Client::delete`] #[derive(Debug, Error)] pub enum DeleteError { - //todo + /// Database not found + #[error("Not found: {}", .0)] + NotFound(String), + + /// Response contained no payload + #[error("Server returned an empty response")] + EmptyResponse, + + /// Server indicated that it is not (yet) available + #[error("Server unavailable: {}", .0.message())] + Unavailable(tonic::Status), + + /// Client received an unexpected error from the server + #[error("Unexpected server error: {}: {}", .0.code(), .0.message())] + ServerError(tonic::Status), } /// Errors returned by [`Client::persist_partition`] #[derive(Debug, Error)] @@ -932,13 +946,33 @@ impl Client { db_name: impl Into + Send, table_name: impl Into + Send, delete_predicate: impl Into + Send, + start_time: impl Into + Send, + stop_time: impl Into + Send, ) -> Result<(), DeleteError> { - let _db_name = db_name.into(); - let _table_name = table_name.into(); - let _delete_predicate = delete_predicate.into(); + let db_name = db_name.into(); + let table_name = table_name.into(); + let delete_predicate = delete_predicate.into(); + let start_time = start_time.into(); + let stop_time = stop_time.into(); - // todo + // NGA todo: Should parse and validate start_time, stop_time, and delete_predicate here + // at in client or send them to the server and do the parsing and validation there? + self.inner + .delete(DeleteRequest { + db_name, + table_name, + delete_predicate, + start_time, + stop_time, + }) + .await + .map_err(|status| match status.code() { + tonic::Code::NotFound => DeleteError::NotFound(status.message().to_string()), + tonic::Code::Unavailable => DeleteError::Unavailable(status), + _ => DeleteError::ServerError(status), + })?; + // NGA todo: return a handle to the delete? Ok(()) } diff --git a/influxdb_line_protocol/src/lib.rs b/influxdb_line_protocol/src/lib.rs index a6057cef97..1f607448f5 100644 --- a/influxdb_line_protocol/src/lib.rs +++ b/influxdb_line_protocol/src/lib.rs @@ -711,7 +711,7 @@ fn integral_value_signed(i: &str) -> IResult<&str, &str> { recognize(preceded(opt(tag("-")), digit1))(i) } -fn timestamp(i: &str) -> IResult<&str, i64> { +pub fn timestamp(i: &str) -> IResult<&str, i64> { map_fail(integral_value_signed, |value| { value.parse().context(TimestampValueInvalid { value }) })(i) @@ -1693,6 +1693,36 @@ bar value2=2i 123"#; assert_eq!(vals[0].field_set[0].1.unwrap_i64(), 1); } + #[test] + fn parse_timestamp() { + let input = r#"123"#; + let time = timestamp(input).unwrap(); + assert_eq!(time.1, 123); + } + + #[test] + fn parse_timestamp_negative() { + let input = r#"-123"#; + let time = timestamp(input).unwrap(); + assert_eq!(time.1, -123); + } + + #[test] + fn parse_timestamp_out_of_range() { + let input = r#"99999999999999999999999999999999"#; + let time = timestamp(input); + assert!( + matches!( + time, + Err(nom::Err::Failure( + super::Error::TimestampValueInvalid { .. } + )) + ), + "Wrong error: {:?}", + time, + ); + } + #[test] fn parse_negative_timestamp() { let input = r#"foo value1=1i -123"#; diff --git a/query/src/lib.rs b/query/src/lib.rs index 5a115ae9ab..951b099758 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -47,6 +47,9 @@ pub trait QueryChunkMeta: Sized { /// return a reference to the summary of the data held in this chunk fn schema(&self) -> Arc; + + // return a reference to delete predicates of the chunk + fn delete_predicates(&self) -> Arc>; } /// A `Database` is the main trait implemented by the IOx subsystems @@ -161,6 +164,10 @@ where fn schema(&self) -> Arc { self.as_ref().schema() } + + fn delete_predicates(&self) -> Arc> { + self.as_ref().delete_predicates() + } } /// Compute a sort key that orders lower cardinality columns first diff --git a/query/src/predicate.rs b/query/src/predicate.rs index 3e2a077418..0a500e6f4a 100644 --- a/query/src/predicate.rs +++ b/query/src/predicate.rs @@ -501,7 +501,6 @@ mod tests { assert_eq!(predicate.exprs[6], col("city").not_eq(lit("Braintree"))); assert_eq!(predicate.exprs[7], lit(5).eq(col("city"))); } - #[test] fn predicate_display_ts() { // TODO make this a doc example? diff --git a/query/src/test.rs b/query/src/test.rs index 89761b9b46..8688ad8024 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -897,6 +897,11 @@ impl QueryChunkMeta for TestChunk { fn schema(&self) -> Arc { Arc::clone(&self.schema) } + + fn delete_predicates(&self) -> Arc> { + let pred: Vec = vec![]; + Arc::new(pred) + } } /// Return the raw data from the list of chunks diff --git a/server/src/db.rs b/server/src/db.rs index fb68d13d6d..bc85611dc7 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -9,7 +9,7 @@ use crate::{ chunk::{CatalogChunk, ChunkStage}, partition::Partition, table::TableSchemaUpsertHandle, - Catalog, TableNameFilter, + Catalog, Error as CatalogError, TableNameFilter, }, lifecycle::{LockableCatalogChunk, LockableCatalogPartition, WeakDb}, }, @@ -116,6 +116,16 @@ pub enum Error { source: mutable_buffer::chunk::Error, }, + #[snafu(display( + "Cannot delete data from non-existing table, {}: {}", + table_name, + source + ))] + DeleteFromTable { + table_name: String, + source: CatalogError, + }, + #[snafu(display( "Storing sequenced entry failed with the following error(s), and possibly more: {}", errors.iter().map(ToString::to_string).collect::>().join(", ") @@ -634,16 +644,22 @@ impl Db { /// Delete data from a table on a specified predicate pub async fn delete( self: &Arc, - _table_name: &str, - _delete_predicate: &str, //todo: this might be a Predicate dta type + table_name: &str, + delete_predicate: &Predicate, ) -> Result<()> { - let partitions = self.catalog.partitions(); - for partition in &partitions { + // get all partitions of this table + let table = self + .catalog + .table(table_name) + .context(DeleteFromTable { table_name })?; + let partitions = table.partitions(); + for partition in partitions { let partition = partition.write(); let chunks = partition.chunks(); - for _chunk in chunks { - // todo - // if this is the chunk of the table, add delete_predicate into the chunk's delete_predicates + for chunk in chunks { + // NGA todo: verify where to close MUB before adding the predicate + let mut chunk = chunk.write(); + chunk.add_delete_predicate(delete_predicate); } } diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index d190beeaa9..c6f345eec2 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -77,7 +77,7 @@ pub struct ChunkMetadata { pub schema: Arc, /// Delete predicates of this chunk - pub delete_predicates: Arc>, + pub delete_predicates: Arc>, } /// Different memory representations of a frozen chunk. @@ -309,7 +309,7 @@ impl CatalogChunk { meta: Arc::new(ChunkMetadata { table_summary: Arc::new(chunk.table_summary()), schema, - delete_predicates: Arc::new(vec![]), //todo: consider to use the one of the given chunk if appropriate + delete_predicates: Arc::new(vec![]), //NGA todo: consider to use the one of the given chunk if appropriate }), representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)), }; @@ -464,6 +464,34 @@ impl CatalogChunk { } } + pub fn add_delete_predicate(&mut self, _delete_predicate: &Predicate) { + match &self.stage { + ChunkStage::Open { mb_chunk: _ } => { + // NGA todo: + // Close the MUB + // Add the delete_predicate to it + } + ChunkStage::Frozen { representation, .. } => match representation { + ChunkStageFrozenRepr::MutableBufferSnapshot(_snapshot) => { + // NGA todo + } + ChunkStageFrozenRepr::ReadBuffer(_rb_chunk) => { + // NGA todo + } + }, + ChunkStage::Persisted { + parquet: _, + read_buffer: Some(_read_buffer), + .. + } => { + // NGA todo + } + ChunkStage::Persisted { parquet: _, .. } => { + // NGA todo + } + } + } + /// Record a write of row data to this chunk /// /// `time_of_write` is the wall clock time of the write diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index bbd18810b0..f7a0242a60 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -490,6 +490,11 @@ impl QueryChunkMeta for DbChunk { fn schema(&self) -> Arc { Arc::clone(&self.meta.schema) } + + // return a reference to delete predicates of the chunk + fn delete_predicates(&self) -> Arc> { + Arc::clone(&self.meta.delete_predicates) + } } #[cfg(test)] diff --git a/src/commands/database/partition.rs b/src/commands/database/partition.rs index 2c2c8b78d3..9a4c29da48 100644 --- a/src/commands/database/partition.rs +++ b/src/commands/database/partition.rs @@ -189,7 +189,7 @@ enum Command { /// Unload chunk from read buffer but keep it in object store. UnloadChunk(UnloadChunk), - // Debating: Should we add the Delete command that deletes data for table of this partition? + // NGA:todo - Debating: Should we add the Delete command that deletes data for table of this partition? } pub async fn command(connection: Connection, config: Config) -> Result<()> { diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 975a15aa12..1ce534c536 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -6,6 +6,7 @@ use std::time::Instant; use data_types::{server_id::ServerId, DatabaseName}; use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound}; use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *}; +use query::predicate::PredicateBuilder; use query::QueryDatabase; use server::rules::ProvidedDatabaseRules; use server::{ApplicationState, ConnectionManager, Error, Server}; @@ -560,11 +561,22 @@ where let DeleteRequest { db_name, table_name, - delete_predicate, - start_time: _, - stop_time: _, + delete_predicate: _, + start_time, + stop_time, } = request.into_inner(); + use influxdb_line_protocol::timestamp; + + // Parse and Validate start time and stop time + let start = timestamp(start_time.as_str()).unwrap().1; + let stop = timestamp(stop_time.as_str()).unwrap().1; + assert!(start < stop, "Stop time has to be after start time"); + + // parse and validate delete predicate which is a conjunctive expressions + // with columns being compared to literals using = or != operators + // NGA: todo + // Validate that the database name is legit let db_name = DatabaseName::new(db_name).field("db_name")?; let db = self @@ -572,14 +584,17 @@ where .db(&db_name) .map_err(default_server_error_handler)?; - // Todo - // Convert start_time and stop_time to time range - // and make a new predicate that is a conjunction of the time range and the delete_predicate + // Build the delete predicate that include all delete expression and time range + let del_predicate = PredicateBuilder::new() + .timestamp_range(start, stop) + //.add_expr // NGA todo: repeat to add delete expressions here + .build(); - db.delete(&table_name, &delete_predicate) + db.delete(&table_name, &del_predicate) .await .map_err(default_db_error_handler)?; + // NGA todo: return a delete handle with the response? Ok(Response::new(DeleteResponse {})) } } diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index bce53c9e44..f9c9ff3efe 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -1383,9 +1383,11 @@ async fn test_delete() { // Build an appropriate test DB let table_name = "test_table"; let delete_predicate = "col = 123"; + let start = "100"; + let stop = "1000"; management_client - .delete(&db_name, table_name, delete_predicate) // note that this function currently does nothing + .delete(&db_name, table_name, delete_predicate, start, stop) // note that this function currently does nothing .await .unwrap();