From 63cc7b3fb04b188ef9cee6cb8f48f3f954535594 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Tue, 14 Sep 2021 17:57:30 -0400 Subject: [PATCH] test: more tests to discover what still need to be done --- predicate/src/predicate.rs | 12 +-- query/src/frontend/influxrpc.rs | 2 +- query/src/lib.rs | 2 +- query/src/provider/physical.rs | 20 ++-- query/src/test.rs | 4 +- server/src/db.rs | 111 ++++++++++++++++++++++- server/src/db/chunk.rs | 52 +++++++---- server/src/db/lifecycle/persist.rs | 2 + server/src/db/lifecycle/write.rs | 8 +- tests/end_to_end_cases/management_api.rs | 4 +- 10 files changed, 173 insertions(+), 44 deletions(-) diff --git a/predicate/src/predicate.rs b/predicate/src/predicate.rs index aa3291c73e..1e39cae73e 100644 --- a/predicate/src/predicate.rs +++ b/predicate/src/predicate.rs @@ -161,13 +161,15 @@ impl Predicate { pub fn is_empty(&self) -> bool { self == &EMPTY_PREDICATE } - - /// Add each range [start, stop] of the delete_predicates into the predicate in + + /// Add each range [start, stop] of the delete_predicates into the predicate in /// the form "time < start OR time > stop" to eliminate that range from the query - pub fn add_delete_ranges(&mut self, delete_predicates: &[Self]) { + pub fn add_delete_ranges(&mut self, delete_predicates: &[Self]) { for pred in delete_predicates { if let Some(range) = pred.range { - let expr = col(TIME_COLUMN_NAME).lt(lit(range.start)).or(col(TIME_COLUMN_NAME).gt(lit(range.end))); + let expr = col(TIME_COLUMN_NAME) + .lt(lit(range.start)) + .or(col(TIME_COLUMN_NAME).gt(lit(range.end))); self.exprs.push(expr); } } @@ -175,14 +177,12 @@ impl Predicate { /// Add a list of not(a delete expr) pub fn add_delete_exprs(&mut self, delete_predicates: &[Self]) { - for pred in delete_predicates { for exp in &pred.exprs { self.exprs.push(exp.clone().not()); } } } - } impl fmt::Display for Predicate { diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 59ebd4f6b8..46c0f63134 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -524,7 +524,7 @@ impl InfluxRpcPlanner { /// The data is sorted on (tag_col1, tag_col2, ...) so that all /// rows for a particular series (groups where all tags are the /// same) occur together in the plan - // NGA todo: may need to add delete predicate here to emilimate deleted data at read time + // NGA todo: may need to add delete predicate here to emilimate deleted data at read time pub fn read_filter(&self, database: &D, predicate: Predicate) -> Result where D: QueryDatabase + 'static, diff --git a/query/src/lib.rs b/query/src/lib.rs index 7c1b2337a1..d282bad0cc 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -137,7 +137,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync { &self, predicate: &Predicate, selection: Selection<'_>, - delete_predicates: &Vec, + delete_predicates: &[Predicate], ) -> Result; /// Returns true if data of this chunk is sorted diff --git a/query/src/provider/physical.rs b/query/src/provider/physical.rs index 4ff5a32fa8..9f4bb0ce5a 100644 --- a/query/src/provider/physical.rs +++ b/query/src/provider/physical.rs @@ -116,16 +116,18 @@ impl ExecutionPlan for IOxReadFilterNode { let selection_cols = restrict_selection(selection_cols, &chunk_table_schema); let selection = Selection::Some(&selection_cols); - let del_preds= chunk.delete_predicates(); + let del_preds = chunk.delete_predicates(); - let stream = chunk.read_filter(&self.predicate, selection, del_preds).map_err(|e| { - DataFusionError::Execution(format!( - "Error creating scan for table {} chunk {}: {}", - self.table_name, - chunk.id(), - e - )) - })?; + let stream = chunk + .read_filter(&self.predicate, selection, del_preds) + .map_err(|e| { + DataFusionError::Execution(format!( + "Error creating scan for table {} chunk {}: {}", + self.table_name, + chunk.id(), + e + )) + })?; // all CPU time is now done, pass in baseline metrics to adapter timer.done(); diff --git a/query/src/test.rs b/query/src/test.rs index c3f571c902..51051eb647 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -822,7 +822,7 @@ impl QueryChunk for TestChunk { &self, predicate: &Predicate, _selection: Selection<'_>, - _delete_predicates: &Vec, + _delete_predicates: &[Predicate], ) -> Result { self.check_error()?; @@ -923,7 +923,7 @@ pub async fn raw_data(chunks: &[Arc]) -> Vec { for c in chunks { let pred = Predicate::default(); let selection = Selection::All; - let delete_predicates: Vec = vec!{}; + let delete_predicates: Vec = vec![]; let mut stream = c .read_filter(&pred, selection, &delete_predicates) .expect("Error in read_filter"); diff --git a/server/src/db.rs b/server/src/db.rs index 1814bb6b4e..69bf2e7a36 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1285,7 +1285,9 @@ mod tests { use arrow::record_batch::RecordBatch; use bytes::Bytes; use chrono::{DateTime, TimeZone}; + use datafusion::logical_plan::{col, lit}; use futures::{stream, StreamExt, TryStreamExt}; + use predicate::predicate::PredicateBuilder; use tokio_util::sync::CancellationToken; use ::test_helpers::assert_contains; @@ -1920,7 +1922,7 @@ mod tests { async fn collect_read_filter(chunk: &DbChunk) -> Vec { chunk - .read_filter(&Default::default(), Selection::All, &vec![]) + .read_filter(&Default::default(), Selection::All, &[]) .unwrap() .collect::>() .await @@ -3782,4 +3784,111 @@ mod tests { .await .unwrap(); } + + #[tokio::test] + async fn test_delete_open_mub() { + ::test_helpers::maybe_start_logging(); + + let db = make_db().await.db; + // load a line at time 10 nanosecond which belong to partition starting with 0 nanosecond or "1970-01-01T00" + write_lp(db.as_ref(), "cpu bar=1 10").await; + write_lp(db.as_ref(), "cpu bar=2 20").await; + let partition_key = "1970-01-01T00"; + assert_eq!(vec![partition_key], db.partition_keys().unwrap()); + + // ---- + // There is one open mub chunk + // Let query it + let expected = vec![ + "+-----+--------------------------------+", + "| bar | time |", + "+-----+--------------------------------+", + "| 1 | 1970-01-01T00:00:00.000000010Z |", + "| 2 | 1970-01-01T00:00:00.000000020Z |", + "+-----+--------------------------------+", + ]; + let batches = run_query(Arc::clone(&db), "select * from cpu").await; + assert_batches_sorted_eq!(expected, &batches); + + // --- + // Delete that row + let i: f64 = 1.0; + let expr = col("bar").eq(lit(i)); + let pred = PredicateBuilder::new() + .table("cpu") + .timestamp_range(0, 15) + .add_expr(expr) + .build(); + db.delete("cpu", &pred).await.unwrap(); + // When the above delete is issued, the open mub chunk is frozen with the delete predicate added + + // Let query again + // NGA todo: since MUB does not filtering data, the filtering needs to be done at scan step + let batches = run_query(Arc::clone(&db), "select * from cpu").await; + assert_batches_sorted_eq!(expected, &batches); // this fails when the todo is done + + // -- + // Since chunk was frozen when the delete is issue, nothing will be frozen futher + let mb_open_chunk = db.rollover_partition("cpu", partition_key).await.unwrap(); + assert!(mb_open_chunk.is_none()); + + // --- + // let move MUB to RUB. The delete predicate will be included in RUB + let mb_chunk_id = 0; + let _mb_chunk = db.chunk("cpu", partition_key, mb_chunk_id).unwrap(); + let _rb_chunk = db + .move_chunk_to_read_buffer("cpu", partition_key, mb_chunk_id) + .await + .unwrap(); + // Let query again and since RUB supports delete predicate, we should see only 1 row + let batches = run_query(Arc::clone(&db), "select * from cpu").await; + let expected = vec![ + "+-----+--------------------------------+", + "| bar | time |", + "+-----+--------------------------------+", + "| 2 | 1970-01-01T00:00:00.000000020Z |", + "+-----+--------------------------------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // --- + // Write the RB chunk to Object Store but keep it in RB + let pq_chunk = db + .persist_partition( + "cpu", + partition_key, + Instant::now() + Duration::from_secs(1), + ) + .await + .unwrap(); + + let pq_chunk_id = pq_chunk.id(); + // we should have chunks in both the read buffer and pq + assert!(mutable_chunk_ids(&db, partition_key).is_empty()); + assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![pq_chunk_id]); + assert_eq!( + read_parquet_file_chunk_ids(&db, partition_key), + vec![pq_chunk_id] + ); + // Let query again from RUB and should see only 1 row + let batches = run_query(Arc::clone(&db), "select * from cpu").await; + assert_batches_sorted_eq!(expected, &batches); + + // --- + // Unload RB chunk but keep it in OS + let _pq_chunk = db + .unload_read_buffer("cpu", partition_key, pq_chunk_id) + .unwrap(); + + // we should only have chunk in os + assert!(mutable_chunk_ids(&db, partition_key).is_empty()); + assert!(read_buffer_chunk_ids(&db, partition_key).is_empty()); + assert_eq!( + read_parquet_file_chunk_ids(&db, partition_key), + vec![pq_chunk_id] + ); + // Let query again but this time from OS + let batches = run_query(Arc::clone(&db), "select * from cpu").await; + assert_batches_sorted_eq!(expected, &batches); + } } diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 360e71389d..d403368f5f 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -5,7 +5,11 @@ use chrono::{DateTime, Utc}; use data_types::{chunk_metadata::ChunkOrder, partition_metadata}; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_util::MemoryStream; -use internal_types::{access::AccessRecorder, schema::{Schema, sort::SortKey}, selection::Selection}; +use internal_types::{ + access::AccessRecorder, + schema::{sort::SortKey, Schema}, + selection::Selection, +}; use iox_object_store::ParquetFilePath; use mutable_buffer::chunk::snapshot::ChunkSnapshot; use observability_deps::tracing::debug; @@ -221,13 +225,16 @@ impl DbChunk { self.time_of_last_write } - pub fn to_rub_negated_predicates(delete_predicates: &Vec) -> Result> { + pub fn to_rub_negated_predicates( + delete_predicates: &[Predicate], + ) -> Result> { let mut rub_preds: Vec = vec![]; for pred in delete_predicates { let rub_pred = to_read_buffer_predicate(pred).context(PredicateConversion)?; rub_preds.push(rub_pred); } + debug!(?rub_preds, "RUB delete predicates"); Ok(rub_preds) } } @@ -324,7 +331,7 @@ impl QueryChunk for DbChunk { &self, predicate: &Predicate, selection: Selection<'_>, - delete_predicates: &Vec, + delete_predicates: &[Predicate], ) -> Result { // Predicate is not required to be applied for correctness. We only pushed it down // when possible for performance gain @@ -332,15 +339,23 @@ impl QueryChunk for DbChunk { debug!(?predicate, "Input Predicate to read_filter"); self.access_recorder.record_access_now(); + debug!(?delete_predicates, "Input Delete Predicates to read_filter"); + // add negated deleted ranges to the predicate let mut pred_with_deleted_ranges = predicate.clone(); pred_with_deleted_ranges.add_delete_ranges(delete_predicates); - debug!(?pred_with_deleted_ranges, "Input Predicate plus deleted ranges"); + debug!( + ?pred_with_deleted_ranges, + "Input Predicate plus deleted ranges" + ); // add negated deleted predicates - let mut pred_wth_deleted_exprs = pred_with_deleted_ranges.clone(); + let mut pred_wth_deleted_exprs = pred_with_deleted_ranges.clone(); pred_wth_deleted_exprs.add_delete_exprs(delete_predicates); - debug!(?pred_wth_deleted_exprs, "Input Predicate plus deleted ranges and deleted predicates"); + debug!( + ?pred_wth_deleted_exprs, + "Input Predicate plus deleted ranges and deleted predicates" + ); match &self.state { State::MutableBuffer { chunk, .. } => { @@ -350,11 +365,12 @@ impl QueryChunk for DbChunk { } State::ReadBuffer { chunk, .. } => { // Only apply pushdownable predicates - let rb_predicate = - match to_read_buffer_predicate(&pred_with_deleted_ranges).context(PredicateConversion) { - Ok(predicate) => predicate, - Err(_) => read_buffer::Predicate::default(), - }; + let rb_predicate = match to_read_buffer_predicate(&pred_with_deleted_ranges) + .context(PredicateConversion) + { + Ok(predicate) => predicate, + Err(_) => read_buffer::Predicate::default(), + }; // combine all delete expressions to RUB's negated ones let negated_delete_exprs = Self::to_rub_negated_predicates(delete_predicates)?; @@ -374,13 +390,11 @@ impl QueryChunk for DbChunk { schema.into(), ))) } - State::ParquetFile { chunk, .. } => { - chunk - .read_filter(&pred_wth_deleted_exprs, selection) - .context(ParquetFileChunkError { - chunk_id: self.id(), - }) - } + State::ParquetFile { chunk, .. } => chunk + .read_filter(&pred_wth_deleted_exprs, selection) + .context(ParquetFileChunkError { + chunk_id: self.id(), + }), } } @@ -544,7 +558,7 @@ mod tests { let t2 = chunk.access_recorder().get_metrics(); snapshot - .read_filter(&Default::default(), Selection::All, &vec![]) + .read_filter(&Default::default(), Selection::All, &[]) .unwrap(); let t3 = chunk.access_recorder().get_metrics(); diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index b990f22f8a..e8e98d1719 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -144,6 +144,8 @@ pub fn persist_chunks( ); } + // NGA todo: we hit this error if there are rows but they are deleted - todo before merge: open a ticket + // Need to think a way to handle this let to_persist = to_persist.expect("should be rows to persist"); let (new_chunk_id, new_chunk) = partition_write.create_rub_chunk( diff --git a/server/src/db/lifecycle/write.rs b/server/src/db/lifecycle/write.rs index b81d737064..e65cd365b7 100644 --- a/server/src/db/lifecycle/write.rs +++ b/server/src/db/lifecycle/write.rs @@ -87,11 +87,13 @@ pub(super) fn write_chunk_to_object_store( let (partition_checkpoint, database_checkpoint) = collect_checkpoints(flush_handle.checkpoint(), &db.catalog); - - // Get RecordBatchStream of data from the read buffer chunk let stream = db_chunk - .read_filter(&Default::default(), Selection::All, db_chunk.delete_predicates()) + .read_filter( + &Default::default(), + Selection::All, + db_chunk.delete_predicates(), + ) .expect("read filter should be infallible"); // check that the upcoming state change will very likely succeed diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index b0c592b08f..78b82fec0f 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -1491,8 +1491,8 @@ async fn test_delete() { // Delete some data let table = "cpu"; let start = "0"; - let stop = "12000000000000"; - let pred = "region = west"; + let stop = "1200"; + let pred = r#"region = "west""#; let _del = management_client .delete(db_name.clone(), table, start, stop, pred) .await