Merge pull request #2425 from influxdata/ntran/issue_delete

feat: Management API for delete
pull/24376/head
kodiakhq[bot] 2021-08-27 21:59:22 +00:00 committed by GitHub
commit 317a83fc50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 214 additions and 2 deletions

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::chunk_metadata::ChunkAddr;
use crate::partition_metadata::PartitionAddr;
use crate::partition_metadata::{DeleteInfo, PartitionAddr};
/// Metadata associated with a set of background tasks
/// Used in combination with TrackerRegistry
@ -42,6 +42,9 @@ pub enum Job {
/// Wipe preserved catalog
WipePreservedCatalog { db_name: Arc<str> },
/// Delete data from a table
Delete { delete_info: DeleteInfo },
}
impl Job {
@ -56,6 +59,7 @@ impl Job {
Self::DropChunk { chunk, .. } => Some(&chunk.db_name),
Self::DropPartition { partition, .. } => Some(&partition.db_name),
Self::WipePreservedCatalog { db_name, .. } => Some(db_name),
Self::Delete { delete_info, .. } => Some(&delete_info.db_name),
}
}
@ -70,6 +74,7 @@ impl Job {
Self::DropChunk { chunk, .. } => Some(&chunk.partition_key),
Self::DropPartition { partition, .. } => Some(&partition.partition_key),
Self::WipePreservedCatalog { .. } => None,
Self::Delete { .. } => None,
}
}
@ -84,6 +89,7 @@ impl Job {
Self::DropChunk { chunk, .. } => Some(&chunk.table_name),
Self::DropPartition { partition, .. } => Some(&partition.table_name),
Self::WipePreservedCatalog { .. } => None,
Self::Delete { delete_info, .. } => Some(&delete_info.table_name),
}
}
@ -98,6 +104,7 @@ impl Job {
Self::DropChunk { chunk, .. } => Some(vec![chunk.chunk_id]),
Self::DropPartition { .. } => None,
Self::WipePreservedCatalog { .. } => None,
Self::Delete { .. } => None,
}
}
@ -114,6 +121,7 @@ impl Job {
"Drop partition from memory and (if persisted) from object store"
}
Self::WipePreservedCatalog { .. } => "Wipe preserved catalog",
Self::Delete { .. } => "Delete data from table",
}
}
}

View File

@ -163,6 +163,38 @@ impl TableSummary {
}
}
/// Delete information
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct DeleteInfo {
/// Database name
pub db_name: Arc<str>,
/// Table with data to delete
pub table_name: Arc<str>,
/// Delete Predicate
// Ideally, this can be any complicated expressions that DataFusion supports
// but in our first version, we only support what our read buffer does which is
// conjunctive expressions with columns being compared to literals using = or != operators.
pub delete_predicate: Arc<str>,
/// Start time range of deleting data
pub start_time: Arc<str>,
/// Stop time range of deleting data
pub stop_time: Arc<str>,
}
impl std::fmt::Display for DeleteInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Delete('{}':'{}':'{}')",
self.db_name, self.table_name, self.delete_predicate
)
}
}
// Replicate this enum here as it can't be derived from the existing statistics
#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
pub enum InfluxDbType {

View File

@ -40,6 +40,7 @@ message OperationMetadata {
PersistChunks persist_chunks = 11;
DropChunk drop_chunk = 12;
DropPartition drop_partition = 17;
Delete delete = 18;
}
}
@ -144,3 +145,30 @@ message WipePreservedCatalog {
// name of the database
string db_name = 1;
}
// Soft delete data from a table on a specified predicate
// Soft delete means data is deleted from customer's point of view but
// they still physically exist in IOx storage (MUB, RUB, OS). During Query time,
// we will filter out the deleted rows.
// We will implement Hard Delete to purge deleted data.
message Delete {
// name of the database
string db_name = 1;
// table name
string table_name = 2;
// delete predicate
// Ideally, this can be any complicated expressions that DataFusion supports
// but in our first version, we only support what our read buffer does which is
// conjunctive expressions with columns being compared to literals using = or != operators.
// Also, to avoid user from making mistake to delete the whole table, we will force them to
// include delete time range start and stop in different fields defined below
string delete_predicate = 3;
// start time range of deleting data
string start_time = 4;
// stop time range of deleting data
string stop_time = 5;
}

View File

@ -82,6 +82,9 @@ service ManagementService {
// Drop partition from memory and (if persisted) from object store.
rpc DropPartition(DropPartitionRequest) returns (DropPartitionResponse);
// Delete data for a table on a specified predicate
rpc Delete(DeleteRequest) returns (DeleteResponse);
}
message GetServerIdRequest {}
@ -387,3 +390,31 @@ message DropPartitionRequest {
message DropPartitionResponse {
}
// Request to delete data from a table on a specified predicate
message DeleteRequest {
// name of the database
string db_name = 1;
// table name
string table_name = 2;
// delete predicate
// Ideally, this can be any complicated expressions that DataFusion supports
// but in our first version, we only support what our read buffer does which is
// conjunctive expressions with columns being compared to literals using = or != operators.
// Also, to avoid user making mistake deleting the whole table, we will force them to
// inlcude delete time range start and stop in different fields defined below
string delete_predicate = 3;
// start time range of deleting data
string start_time = 4;
// stop time range of deleting data
string stop_time = 5;
}
message DeleteResponse {
// todo
// string delete_status = 1;
}

View File

@ -3,7 +3,7 @@ use crate::influxdata::iox::management::v1 as management;
use crate::protobuf_type_url_eq;
use data_types::chunk_metadata::ChunkAddr;
use data_types::job::{Job, OperationStatus};
use data_types::partition_metadata::PartitionAddr;
use data_types::partition_metadata::{DeleteInfo, PartitionAddr};
use std::convert::TryFrom;
use std::sync::Arc;
@ -58,6 +58,13 @@ impl From<Job> for management::operation_metadata::Job {
partition_key: partition.partition_key.to_string(),
table_name: partition.table_name.to_string(),
}),
Job::Delete { delete_info } => Self::Delete(management::Delete {
db_name: delete_info.db_name.to_string(),
table_name: delete_info.table_name.to_string(),
delete_predicate: delete_info.delete_predicate.to_string(),
start_time: delete_info.start_time.to_string(),
stop_time: delete_info.stop_time.to_string(),
}),
}
}
}
@ -151,6 +158,21 @@ impl From<management::operation_metadata::Job> for Job {
partition_key: Arc::from(partition_key.as_str()),
},
},
Job::Delete(management::Delete {
db_name,
table_name,
delete_predicate,
start_time,
stop_time,
}) => Self::Delete {
delete_info: DeleteInfo {
db_name: Arc::from(db_name.as_str()),
table_name: Arc::from(table_name.as_str()),
delete_predicate: Arc::from(delete_predicate.as_str()),
start_time: Arc::from(start_time.as_str()),
stop_time: Arc::from(stop_time.as_str()),
},
},
}
}
}

View File

@ -337,6 +337,11 @@ pub enum DropPartitionError {
ServerError(tonic::Status),
}
/// Errors returned by [`Client::delete`]
#[derive(Debug, Error)]
pub enum DeleteError {
//todo
}
/// Errors returned by [`Client::persist_partition`]
#[derive(Debug, Error)]
pub enum PersistPartitionError {
@ -847,6 +852,22 @@ impl Client {
Ok(())
}
/// Delete data from a table on a specified predicate
pub async fn delete(
&mut self,
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
delete_predicate: impl Into<String> + Send,
) -> Result<(), DeleteError> {
let _db_name = db_name.into();
let _table_name = table_name.into();
let _delete_predicate = delete_predicate.into();
// todo
Ok(())
}
/// Persist given partition.
///
/// Errors if there is nothing to persist at the moment as per the lifecycle rules. If successful it returns the

View File

@ -637,6 +637,25 @@ impl Db {
fut.await.context(TaskCancelled)?.context(LifecycleError)
}
/// Delete data from a table on a specified predicate
pub async fn delete(
self: &Arc<Self>,
_table_name: &str,
_delete_predicate: &str, //todo: this might be a Predicate dta type
) -> Result<()> {
let partitions = self.catalog.partitions();
for partition in &partitions {
let partition = partition.write();
let chunks = partition.chunks();
for _chunk in chunks {
// todo
// if this is the chunk of the table, add delete_predicate into the chunk's delete_predicates
}
}
Ok(())
}
/// Copies a chunk in the Closed state into the ReadBuffer from
/// the mutable buffer and marks the chunk with `Moved` state
///

View File

@ -189,6 +189,7 @@ enum Command {
/// Unload chunk from read buffer but keep it in object store.
UnloadChunk(UnloadChunk),
// Debating: Should we add the Delete command that deletes data for table of this partition?
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {

View File

@ -533,6 +533,36 @@ where
Ok(Response::new(DropPartitionResponse {}))
}
async fn delete(
&self,
request: tonic::Request<DeleteRequest>,
) -> Result<tonic::Response<DeleteResponse>, tonic::Status> {
let DeleteRequest {
db_name,
table_name,
delete_predicate,
start_time: _,
stop_time: _,
} = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).field("db_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
// Todo
// Convert start_time and stop_time to time range
// and make a new predicate that is a conjunction of the time range and the delete_predicate
db.delete(&table_name, &delete_predicate)
.await
.map_err(default_db_error_handler)?;
Ok(Response::new(DeleteResponse {}))
}
}
pub fn make_server<M>(

View File

@ -1228,6 +1228,26 @@ async fn test_drop_partition_error() {
assert_contains!(err.to_string(), "Cannot drop unpersisted chunk");
}
#[tokio::test]
async fn test_delete() {
let fixture = ServerFixture::create_shared().await;
let _write_client = fixture.write_client();
let mut management_client = fixture.management_client();
let db_name = rand_name();
// Todo
// Build an appropriate test DB
let table_name = "test_table";
let delete_predicate = "col = 123";
management_client
.delete(&db_name, table_name, delete_predicate) // note that this function currently does nothing
.await
.unwrap();
// Todo: check return delete outcome
}
#[tokio::test]
async fn test_persist_partition() {
use data_types::chunk_metadata::ChunkStorage;