diff --git a/data_types/src/job.rs b/data_types/src/job.rs index 0f29911920..856b877201 100644 --- a/data_types/src/job.rs +++ b/data_types/src/job.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; use crate::chunk_metadata::ChunkAddr; -use crate::partition_metadata::PartitionAddr; +use crate::partition_metadata::{DeleteInfo, PartitionAddr}; /// Metadata associated with a set of background tasks /// Used in combination with TrackerRegistry @@ -42,6 +42,9 @@ pub enum Job { /// Wipe preserved catalog WipePreservedCatalog { db_name: Arc }, + + /// Delete data from a table + Delete { delete_info: DeleteInfo }, } impl Job { @@ -56,6 +59,7 @@ impl Job { Self::DropChunk { chunk, .. } => Some(&chunk.db_name), Self::DropPartition { partition, .. } => Some(&partition.db_name), Self::WipePreservedCatalog { db_name, .. } => Some(db_name), + Self::Delete { delete_info, .. } => Some(&delete_info.db_name), } } @@ -70,6 +74,7 @@ impl Job { Self::DropChunk { chunk, .. } => Some(&chunk.partition_key), Self::DropPartition { partition, .. } => Some(&partition.partition_key), Self::WipePreservedCatalog { .. } => None, + Self::Delete { .. } => None, } } @@ -84,6 +89,7 @@ impl Job { Self::DropChunk { chunk, .. } => Some(&chunk.table_name), Self::DropPartition { partition, .. } => Some(&partition.table_name), Self::WipePreservedCatalog { .. } => None, + Self::Delete { delete_info, .. } => Some(&delete_info.table_name), } } @@ -98,6 +104,7 @@ impl Job { Self::DropChunk { chunk, .. } => Some(vec![chunk.chunk_id]), Self::DropPartition { .. } => None, Self::WipePreservedCatalog { .. } => None, + Self::Delete { .. } => None, } } @@ -114,6 +121,7 @@ impl Job { "Drop partition from memory and (if persisted) from object store" } Self::WipePreservedCatalog { .. } => "Wipe preserved catalog", + Self::Delete { .. } => "Delete data from table", } } } diff --git a/data_types/src/partition_metadata.rs b/data_types/src/partition_metadata.rs index 804db703d1..1d7874a819 100644 --- a/data_types/src/partition_metadata.rs +++ b/data_types/src/partition_metadata.rs @@ -163,6 +163,38 @@ impl TableSummary { } } +/// Delete information +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct DeleteInfo { + /// Database name + pub db_name: Arc, + + /// Table with data to delete + pub table_name: Arc, + + /// Delete Predicate + // Ideally, this can be any complicated expressions that DataFusion supports + // but in our first version, we only support what our read buffer does which is + // conjunctive expressions with columns being compared to literals using = or != operators. + pub delete_predicate: Arc, + + /// Start time range of deleting data + pub start_time: Arc, + + /// Stop time range of deleting data + pub stop_time: Arc, +} + +impl std::fmt::Display for DeleteInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Delete('{}':'{}':'{}')", + self.db_name, self.table_name, self.delete_predicate + ) + } +} + // Replicate this enum here as it can't be derived from the existing statistics #[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] pub enum InfluxDbType { diff --git a/generated_types/protos/influxdata/iox/management/v1/jobs.proto b/generated_types/protos/influxdata/iox/management/v1/jobs.proto index 106d5c03ae..e2917171cb 100644 --- a/generated_types/protos/influxdata/iox/management/v1/jobs.proto +++ b/generated_types/protos/influxdata/iox/management/v1/jobs.proto @@ -40,6 +40,7 @@ message OperationMetadata { PersistChunks persist_chunks = 11; DropChunk drop_chunk = 12; DropPartition drop_partition = 17; + Delete delete = 18; } } @@ -144,3 +145,30 @@ message WipePreservedCatalog { // name of the database string db_name = 1; } + +// Soft delete data from a table on a specified predicate +// Soft delete means data is deleted from customer's point of view but +// they still physically exist in IOx storage (MUB, RUB, OS). During Query time, +// we will filter out the deleted rows. +// We will implement Hard Delete to purge deleted data. +message Delete { + // name of the database + string db_name = 1; + + // table name + string table_name = 2; + + // delete predicate + // Ideally, this can be any complicated expressions that DataFusion supports + // but in our first version, we only support what our read buffer does which is + // conjunctive expressions with columns being compared to literals using = or != operators. + // Also, to avoid user from making mistake to delete the whole table, we will force them to + // include delete time range start and stop in different fields defined below + string delete_predicate = 3; + + // start time range of deleting data + string start_time = 4; + + // stop time range of deleting data + string stop_time = 5; +} \ No newline at end of file diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index 18aaf29d5d..b384f6fe53 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -82,6 +82,9 @@ service ManagementService { // Drop partition from memory and (if persisted) from object store. rpc DropPartition(DropPartitionRequest) returns (DropPartitionResponse); + + // Delete data for a table on a specified predicate + rpc Delete(DeleteRequest) returns (DeleteResponse); } message GetServerIdRequest {} @@ -387,3 +390,31 @@ message DropPartitionRequest { message DropPartitionResponse { } + +// Request to delete data from a table on a specified predicate +message DeleteRequest { + // name of the database + string db_name = 1; + + // table name + string table_name = 2; + + // delete predicate + // Ideally, this can be any complicated expressions that DataFusion supports + // but in our first version, we only support what our read buffer does which is + // conjunctive expressions with columns being compared to literals using = or != operators. + // Also, to avoid user making mistake deleting the whole table, we will force them to + // inlcude delete time range start and stop in different fields defined below + string delete_predicate = 3; + + // start time range of deleting data + string start_time = 4; + + // stop time range of deleting data + string stop_time = 5; +} + +message DeleteResponse { + // todo + // string delete_status = 1; +} diff --git a/generated_types/src/job.rs b/generated_types/src/job.rs index 499826f455..841e80e0c5 100644 --- a/generated_types/src/job.rs +++ b/generated_types/src/job.rs @@ -3,7 +3,7 @@ use crate::influxdata::iox::management::v1 as management; use crate::protobuf_type_url_eq; use data_types::chunk_metadata::ChunkAddr; use data_types::job::{Job, OperationStatus}; -use data_types::partition_metadata::PartitionAddr; +use data_types::partition_metadata::{DeleteInfo, PartitionAddr}; use std::convert::TryFrom; use std::sync::Arc; @@ -58,6 +58,13 @@ impl From for management::operation_metadata::Job { partition_key: partition.partition_key.to_string(), table_name: partition.table_name.to_string(), }), + Job::Delete { delete_info } => Self::Delete(management::Delete { + db_name: delete_info.db_name.to_string(), + table_name: delete_info.table_name.to_string(), + delete_predicate: delete_info.delete_predicate.to_string(), + start_time: delete_info.start_time.to_string(), + stop_time: delete_info.stop_time.to_string(), + }), } } } @@ -151,6 +158,21 @@ impl From for Job { partition_key: Arc::from(partition_key.as_str()), }, }, + Job::Delete(management::Delete { + db_name, + table_name, + delete_predicate, + start_time, + stop_time, + }) => Self::Delete { + delete_info: DeleteInfo { + db_name: Arc::from(db_name.as_str()), + table_name: Arc::from(table_name.as_str()), + delete_predicate: Arc::from(delete_predicate.as_str()), + start_time: Arc::from(start_time.as_str()), + stop_time: Arc::from(stop_time.as_str()), + }, + }, } } } diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index e742236389..63f616696f 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -337,6 +337,11 @@ pub enum DropPartitionError { ServerError(tonic::Status), } +/// Errors returned by [`Client::delete`] +#[derive(Debug, Error)] +pub enum DeleteError { + //todo +} /// Errors returned by [`Client::persist_partition`] #[derive(Debug, Error)] pub enum PersistPartitionError { @@ -847,6 +852,22 @@ impl Client { Ok(()) } + /// Delete data from a table on a specified predicate + pub async fn delete( + &mut self, + db_name: impl Into + Send, + table_name: impl Into + Send, + delete_predicate: impl Into + Send, + ) -> Result<(), DeleteError> { + let _db_name = db_name.into(); + let _table_name = table_name.into(); + let _delete_predicate = delete_predicate.into(); + + // todo + + Ok(()) + } + /// Persist given partition. /// /// Errors if there is nothing to persist at the moment as per the lifecycle rules. If successful it returns the diff --git a/server/src/db.rs b/server/src/db.rs index 46fa519b7e..59ffc01017 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -637,6 +637,25 @@ impl Db { fut.await.context(TaskCancelled)?.context(LifecycleError) } + /// Delete data from a table on a specified predicate + pub async fn delete( + self: &Arc, + _table_name: &str, + _delete_predicate: &str, //todo: this might be a Predicate dta type + ) -> Result<()> { + let partitions = self.catalog.partitions(); + for partition in &partitions { + let partition = partition.write(); + let chunks = partition.chunks(); + for _chunk in chunks { + // todo + // if this is the chunk of the table, add delete_predicate into the chunk's delete_predicates + } + } + + Ok(()) + } + /// Copies a chunk in the Closed state into the ReadBuffer from /// the mutable buffer and marks the chunk with `Moved` state /// diff --git a/src/commands/database/partition.rs b/src/commands/database/partition.rs index 22046f8046..2c2c8b78d3 100644 --- a/src/commands/database/partition.rs +++ b/src/commands/database/partition.rs @@ -189,6 +189,7 @@ enum Command { /// Unload chunk from read buffer but keep it in object store. UnloadChunk(UnloadChunk), + // Debating: Should we add the Delete command that deletes data for table of this partition? } pub async fn command(connection: Connection, config: Config) -> Result<()> { diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index d640127bf7..e887a4b7f0 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -533,6 +533,36 @@ where Ok(Response::new(DropPartitionResponse {})) } + + async fn delete( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let DeleteRequest { + db_name, + table_name, + delete_predicate, + start_time: _, + stop_time: _, + } = request.into_inner(); + + // Validate that the database name is legit + let db_name = DatabaseName::new(db_name).field("db_name")?; + let db = self + .server + .db(&db_name) + .map_err(default_server_error_handler)?; + + // Todo + // Convert start_time and stop_time to time range + // and make a new predicate that is a conjunction of the time range and the delete_predicate + + db.delete(&table_name, &delete_predicate) + .await + .map_err(default_db_error_handler)?; + + Ok(Response::new(DeleteResponse {})) + } } pub fn make_server( diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 65841b483d..797616867f 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -1228,6 +1228,26 @@ async fn test_drop_partition_error() { assert_contains!(err.to_string(), "Cannot drop unpersisted chunk"); } +#[tokio::test] +async fn test_delete() { + let fixture = ServerFixture::create_shared().await; + let _write_client = fixture.write_client(); + let mut management_client = fixture.management_client(); + + let db_name = rand_name(); + + // Todo + // Build an appropriate test DB + let table_name = "test_table"; + let delete_predicate = "col = 123"; + + management_client + .delete(&db_name, table_name, delete_predicate) // note that this function currently does nothing + .await + .unwrap(); + + // Todo: check return delete outcome +} #[tokio::test] async fn test_persist_partition() { use data_types::chunk_metadata::ChunkStorage;