feat: more progress on the delete flow from grpc API to catalog chunks

pull/24376/head
Nga Tran 2021-08-31 17:42:07 -04:00
parent f962d0ef2e
commit a4183de411
12 changed files with 168 additions and 28 deletions

View File

@ -448,6 +448,5 @@ message DeleteRequest {
}
message DeleteResponse {
// todo
// string delete_status = 1;
// NGA todo: response something?
}

View File

@ -360,7 +360,21 @@ pub enum DropPartitionError {
/// Errors returned by [`Client::delete`]
#[derive(Debug, Error)]
pub enum DeleteError {
//todo
/// Database not found
#[error("Not found: {}", .0)]
NotFound(String),
/// Response contained no payload
#[error("Server returned an empty response")]
EmptyResponse,
/// Server indicated that it is not (yet) available
#[error("Server unavailable: {}", .0.message())]
Unavailable(tonic::Status),
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// Errors returned by [`Client::persist_partition`]
#[derive(Debug, Error)]
@ -932,13 +946,33 @@ impl Client {
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
delete_predicate: impl Into<String> + Send,
start_time: impl Into<String> + Send,
stop_time: impl Into<String> + Send,
) -> Result<(), DeleteError> {
let _db_name = db_name.into();
let _table_name = table_name.into();
let _delete_predicate = delete_predicate.into();
let db_name = db_name.into();
let table_name = table_name.into();
let delete_predicate = delete_predicate.into();
let start_time = start_time.into();
let stop_time = stop_time.into();
// todo
// NGA todo: Should parse and validate start_time, stop_time, and delete_predicate here
// at in client or send them to the server and do the parsing and validation there?
self.inner
.delete(DeleteRequest {
db_name,
table_name,
delete_predicate,
start_time,
stop_time,
})
.await
.map_err(|status| match status.code() {
tonic::Code::NotFound => DeleteError::NotFound(status.message().to_string()),
tonic::Code::Unavailable => DeleteError::Unavailable(status),
_ => DeleteError::ServerError(status),
})?;
// NGA todo: return a handle to the delete?
Ok(())
}

View File

@ -711,7 +711,7 @@ fn integral_value_signed(i: &str) -> IResult<&str, &str> {
recognize(preceded(opt(tag("-")), digit1))(i)
}
fn timestamp(i: &str) -> IResult<&str, i64> {
pub fn timestamp(i: &str) -> IResult<&str, i64> {
map_fail(integral_value_signed, |value| {
value.parse().context(TimestampValueInvalid { value })
})(i)
@ -1693,6 +1693,36 @@ bar value2=2i 123"#;
assert_eq!(vals[0].field_set[0].1.unwrap_i64(), 1);
}
#[test]
fn parse_timestamp() {
let input = r#"123"#;
let time = timestamp(input).unwrap();
assert_eq!(time.1, 123);
}
#[test]
fn parse_timestamp_negative() {
let input = r#"-123"#;
let time = timestamp(input).unwrap();
assert_eq!(time.1, -123);
}
#[test]
fn parse_timestamp_out_of_range() {
let input = r#"99999999999999999999999999999999"#;
let time = timestamp(input);
assert!(
matches!(
time,
Err(nom::Err::Failure(
super::Error::TimestampValueInvalid { .. }
))
),
"Wrong error: {:?}",
time,
);
}
#[test]
fn parse_negative_timestamp() {
let input = r#"foo value1=1i -123"#;

View File

@ -47,6 +47,9 @@ pub trait QueryChunkMeta: Sized {
/// return a reference to the summary of the data held in this chunk
fn schema(&self) -> Arc<Schema>;
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> Arc<Vec<Predicate>>;
}
/// A `Database` is the main trait implemented by the IOx subsystems
@ -161,6 +164,10 @@ where
fn schema(&self) -> Arc<Schema> {
self.as_ref().schema()
}
fn delete_predicates(&self) -> Arc<Vec<Predicate>> {
self.as_ref().delete_predicates()
}
}
/// Compute a sort key that orders lower cardinality columns first

View File

@ -501,7 +501,6 @@ mod tests {
assert_eq!(predicate.exprs[6], col("city").not_eq(lit("Braintree")));
assert_eq!(predicate.exprs[7], lit(5).eq(col("city")));
}
#[test]
fn predicate_display_ts() {
// TODO make this a doc example?

View File

@ -897,6 +897,11 @@ impl QueryChunkMeta for TestChunk {
fn schema(&self) -> Arc<Schema> {
Arc::clone(&self.schema)
}
fn delete_predicates(&self) -> Arc<Vec<Predicate>> {
let pred: Vec<Predicate> = vec![];
Arc::new(pred)
}
}
/// Return the raw data from the list of chunks

View File

@ -9,7 +9,7 @@ use crate::{
chunk::{CatalogChunk, ChunkStage},
partition::Partition,
table::TableSchemaUpsertHandle,
Catalog, TableNameFilter,
Catalog, Error as CatalogError, TableNameFilter,
},
lifecycle::{LockableCatalogChunk, LockableCatalogPartition, WeakDb},
},
@ -116,6 +116,16 @@ pub enum Error {
source: mutable_buffer::chunk::Error,
},
#[snafu(display(
"Cannot delete data from non-existing table, {}: {}",
table_name,
source
))]
DeleteFromTable {
table_name: String,
source: CatalogError,
},
#[snafu(display(
"Storing sequenced entry failed with the following error(s), and possibly more: {}",
errors.iter().map(ToString::to_string).collect::<Vec<_>>().join(", ")
@ -634,16 +644,22 @@ impl Db {
/// Delete data from a table on a specified predicate
pub async fn delete(
self: &Arc<Self>,
_table_name: &str,
_delete_predicate: &str, //todo: this might be a Predicate dta type
table_name: &str,
delete_predicate: &Predicate,
) -> Result<()> {
let partitions = self.catalog.partitions();
for partition in &partitions {
// get all partitions of this table
let table = self
.catalog
.table(table_name)
.context(DeleteFromTable { table_name })?;
let partitions = table.partitions();
for partition in partitions {
let partition = partition.write();
let chunks = partition.chunks();
for _chunk in chunks {
// todo
// if this is the chunk of the table, add delete_predicate into the chunk's delete_predicates
for chunk in chunks {
// NGA todo: verify where to close MUB before adding the predicate
let mut chunk = chunk.write();
chunk.add_delete_predicate(delete_predicate);
}
}

View File

@ -77,7 +77,7 @@ pub struct ChunkMetadata {
pub schema: Arc<Schema>,
/// Delete predicates of this chunk
pub delete_predicates: Arc<Vec<Predicate>>,
pub delete_predicates: Arc<Vec<Predicate>>,
}
/// Different memory representations of a frozen chunk.
@ -309,7 +309,7 @@ impl CatalogChunk {
meta: Arc::new(ChunkMetadata {
table_summary: Arc::new(chunk.table_summary()),
schema,
delete_predicates: Arc::new(vec![]), //todo: consider to use the one of the given chunk if appropriate
delete_predicates: Arc::new(vec![]), //NGA todo: consider to use the one of the given chunk if appropriate
}),
representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)),
};
@ -464,6 +464,34 @@ impl CatalogChunk {
}
}
pub fn add_delete_predicate(&mut self, _delete_predicate: &Predicate) {
match &self.stage {
ChunkStage::Open { mb_chunk: _ } => {
// NGA todo:
// Close the MUB
// Add the delete_predicate to it
}
ChunkStage::Frozen { representation, .. } => match representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(_snapshot) => {
// NGA todo
}
ChunkStageFrozenRepr::ReadBuffer(_rb_chunk) => {
// NGA todo
}
},
ChunkStage::Persisted {
parquet: _,
read_buffer: Some(_read_buffer),
..
} => {
// NGA todo
}
ChunkStage::Persisted { parquet: _, .. } => {
// NGA todo
}
}
}
/// Record a write of row data to this chunk
///
/// `time_of_write` is the wall clock time of the write

View File

@ -490,6 +490,11 @@ impl QueryChunkMeta for DbChunk {
fn schema(&self) -> Arc<Schema> {
Arc::clone(&self.meta.schema)
}
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> Arc<Vec<Predicate>> {
Arc::clone(&self.meta.delete_predicates)
}
}
#[cfg(test)]

View File

@ -189,7 +189,7 @@ enum Command {
/// Unload chunk from read buffer but keep it in object store.
UnloadChunk(UnloadChunk),
// Debating: Should we add the Delete command that deletes data for table of this partition?
// NGA:todo - Debating: Should we add the Delete command that deletes data for table of this partition?
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {

View File

@ -6,6 +6,7 @@ use std::time::Instant;
use data_types::{server_id::ServerId, DatabaseName};
use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound};
use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *};
use query::predicate::PredicateBuilder;
use query::QueryDatabase;
use server::rules::ProvidedDatabaseRules;
use server::{ApplicationState, ConnectionManager, Error, Server};
@ -560,11 +561,22 @@ where
let DeleteRequest {
db_name,
table_name,
delete_predicate,
start_time: _,
stop_time: _,
delete_predicate: _,
start_time,
stop_time,
} = request.into_inner();
use influxdb_line_protocol::timestamp;
// Parse and Validate start time and stop time
let start = timestamp(start_time.as_str()).unwrap().1;
let stop = timestamp(stop_time.as_str()).unwrap().1;
assert!(start < stop, "Stop time has to be after start time");
// parse and validate delete predicate which is a conjunctive expressions
// with columns being compared to literals using = or != operators
// NGA: todo
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).field("db_name")?;
let db = self
@ -572,14 +584,17 @@ where
.db(&db_name)
.map_err(default_server_error_handler)?;
// Todo
// Convert start_time and stop_time to time range
// and make a new predicate that is a conjunction of the time range and the delete_predicate
// Build the delete predicate that include all delete expression and time range
let del_predicate = PredicateBuilder::new()
.timestamp_range(start, stop)
//.add_expr // NGA todo: repeat to add delete expressions here
.build();
db.delete(&table_name, &delete_predicate)
db.delete(&table_name, &del_predicate)
.await
.map_err(default_db_error_handler)?;
// NGA todo: return a delete handle with the response?
Ok(Response::new(DeleteResponse {}))
}
}

View File

@ -1383,9 +1383,11 @@ async fn test_delete() {
// Build an appropriate test DB
let table_name = "test_table";
let delete_predicate = "col = 123";
let start = "100";
let stop = "1000";
management_client
.delete(&db_name, table_name, delete_predicate) // note that this function currently does nothing
.delete(&db_name, table_name, delete_predicate, start, stop) // note that this function currently does nothing
.await
.unwrap();