diff --git a/server/src/db.rs b/server/src/db.rs index ca354e7831..db1e4388ea 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -4,6 +4,7 @@ pub(crate) use crate::db::chunk::DbChunk; use crate::{ db::{ + self, access::QueryCatalogAccess, catalog::{ chunk::{CatalogChunk, ChunkStage}, @@ -40,7 +41,7 @@ use parquet_file::catalog::{ use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows}; use query::{ exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext}, - predicate::Predicate, + predicate::{Predicate, PredicateBuilder}, QueryDatabase, }; use rand_distr::{Distribution, Poisson}; @@ -125,6 +126,9 @@ pub enum Error { source: CatalogError, }, + #[snafu(display("Internal error while adding predicate predicate to chunk: {}", source))] + AddDeletePredicateError { source: db::catalog::chunk::Error }, + #[snafu(display( "Storing sequenced entry failed with the following error(s), and possibly more: {}", errors.iter().map(ToString::to_string).collect::>().join(", ") @@ -551,7 +555,9 @@ impl Db { info!(%table_name, %partition_key, found_chunk=chunk.is_some(), "rolling over a partition"); if let Some(chunk) = chunk { let mut chunk = chunk.write(); - chunk.freeze().context(FreezingChunk)?; + chunk + .freeze(&PredicateBuilder::default().build()) + .context(FreezingChunk)?; Ok(Some(DbChunk::snapshot(&chunk))) } else { @@ -653,9 +659,11 @@ impl Db { let partition = partition.write(); let chunks = partition.chunks(); for chunk in chunks { - // NGA todo: verify where to close MUB before adding the predicate + // save the delete predicate in the chunk let mut chunk = chunk.write(); - chunk.add_delete_predicate(delete_predicate); + chunk + .add_delete_predicate(delete_predicate) + .context(AddDeletePredicateError)?; } } @@ -1304,7 +1312,9 @@ impl Db { let handle_chunk_write = |chunk: &mut CatalogChunk| { chunk.record_write(time_of_write, ×tamp_summary); if chunk.storage().0 >= mub_row_threshold.get() { - chunk.freeze().expect("freeze mub chunk"); + chunk + .freeze(&PredicateBuilder::default().build()) + .expect("freeze mub chunk"); } }; diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 1fa615898f..60a78c7a88 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -13,7 +13,7 @@ use internal_types::{access::AccessRecorder, schema::Schema}; use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, MBChunk}; use observability_deps::tracing::debug; use parquet_file::chunk::ParquetChunk; -use query::predicate::Predicate; +use query::predicate::{Predicate, PredicateBuilder}; use read_buffer::RBChunk; use snafu::Snafu; use std::sync::Arc; @@ -295,12 +295,13 @@ impl CatalogChunk { time_of_last_write: DateTime, schema: Arc, metrics: ChunkMetrics, + delete_predicates: Arc>, ) -> Self { let stage = ChunkStage::Frozen { meta: Arc::new(ChunkMetadata { table_summary: Arc::new(chunk.table_summary()), schema, - delete_predicates: Arc::new(vec![]), //NGA todo: consider to use the one of the given chunk if appropriate + delete_predicates: Arc::clone(&delete_predicates), }), representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)), }; @@ -327,6 +328,7 @@ impl CatalogChunk { time_of_first_write: DateTime, time_of_last_write: DateTime, metrics: ChunkMetrics, + delete_predicates: Arc>, ) -> Self { assert_eq!(chunk.table_name(), addr.table_name.as_ref()); @@ -334,7 +336,7 @@ impl CatalogChunk { let meta = Arc::new(ChunkMetadata { table_summary: Arc::clone(chunk.table_summary()), schema: chunk.schema(), - delete_predicates: Arc::new(vec![]), //NGA todo: consider to use the one of the given chunk if appropriate + delete_predicates: Arc::clone(&delete_predicates), }); let stage = ChunkStage::Persisted { @@ -451,31 +453,45 @@ impl CatalogChunk { } } - pub fn add_delete_predicate(&mut self, _delete_predicate: &Predicate) { + pub fn add_delete_predicate(&mut self, delete_predicate: &Predicate) -> Result<()> { + match &mut self.stage { + ChunkStage::Open { mb_chunk: _ } => { + // Freeze/close this chunk and add delete_predicate to its frozen one + self.freeze(delete_predicate)?; + } + ChunkStage::Frozen { meta, .. } => { + // Add the delete_predicate into the chunk's metadata + let mut del_preds: Vec = (*meta.delete_predicates).clone(); + del_preds.push(delete_predicate.clone()); + *meta = Arc::new(ChunkMetadata { + table_summary: Arc::clone(&meta.table_summary), + schema: Arc::clone(&meta.schema), + delete_predicates: Arc::new(del_preds), + }); + } + ChunkStage::Persisted { meta, .. } => { + // Add the delete_predicate into the chunk's metadata + let mut del_preds: Vec = (*meta.delete_predicates).clone(); + del_preds.push(delete_predicate.clone()); + *meta = Arc::new(ChunkMetadata { + table_summary: Arc::clone(&meta.table_summary), + schema: Arc::clone(&meta.schema), + delete_predicates: Arc::new(del_preds), + }); + } + } + + Ok(()) + } + + pub fn delete_predicates(&mut self) -> Arc> { match &self.stage { ChunkStage::Open { mb_chunk: _ } => { - // NGA todo: - // Close the MUB - // Add the delete_predicate to it - } - ChunkStage::Frozen { representation, .. } => match representation { - ChunkStageFrozenRepr::MutableBufferSnapshot(_snapshot) => { - // NGA todo - } - ChunkStageFrozenRepr::ReadBuffer(_rb_chunk) => { - // NGA todo - } - }, - ChunkStage::Persisted { - parquet: _, - read_buffer: Some(_read_buffer), - .. - } => { - // NGA todo - } - ChunkStage::Persisted { parquet: _, .. } => { - // NGA todo + // no delete predicate for open chunk + Arc::new(vec![]) } + ChunkStage::Frozen { meta, .. } => Arc::clone(&meta.delete_predicates), + ChunkStage::Persisted { meta, .. } => Arc::clone(&meta.delete_predicates), } } @@ -643,7 +659,7 @@ impl CatalogChunk { /// /// This only works for chunks in the _open_ stage (chunk is converted) and the _frozen_ stage /// (no-op) and will fail for other stages. - pub fn freeze(&mut self) -> Result<()> { + pub fn freeze(&mut self, delete_predicate: &Predicate) -> Result<()> { match &self.stage { ChunkStage::Open { mb_chunk, .. } => { debug!(%self.addr, row_count=mb_chunk.rows(), "freezing chunk"); @@ -656,7 +672,7 @@ impl CatalogChunk { let metadata = ChunkMetadata { table_summary: Arc::new(mb_chunk.table_summary()), schema: s.full_schema(), - delete_predicates: Arc::new(vec![]), //NGA todo: consider to use the one of the mb_chunk if appropriate + delete_predicates: Arc::new(vec![delete_predicate.clone()]), }; self.stage = ChunkStage::Frozen { @@ -684,7 +700,7 @@ impl CatalogChunk { // This ensures the closing logic is consistent but doesn't break code that // assumes a chunk can be moved from open if matches!(self.stage, ChunkStage::Open { .. }) { - self.freeze()?; + self.freeze(&PredicateBuilder::default().build())?; } match &self.stage { @@ -714,7 +730,7 @@ impl CatalogChunk { match &self.stage { ChunkStage::Open { .. } | ChunkStage::Frozen { .. } => { self.set_lifecycle_action(ChunkLifecycleAction::Compacting, registration)?; - self.freeze()?; + self.freeze(&PredicateBuilder::default().build())?; Ok(()) } ChunkStage::Persisted { .. } => { @@ -736,7 +752,7 @@ impl CatalogChunk { *meta = Arc::new(ChunkMetadata { table_summary: Arc::clone(&meta.table_summary), schema, - delete_predicates: Arc::new(vec![]), //NGA todo: consider to use the one of the given chunk if appropriate + delete_predicates: Arc::clone(&meta.delete_predicates), }); match &representation { @@ -766,7 +782,7 @@ impl CatalogChunk { // This ensures the closing logic is consistent but doesn't break code that // assumes a chunk can be moved from open if matches!(self.stage, ChunkStage::Open { .. }) { - self.freeze()?; + self.freeze(&PredicateBuilder::default().build())?; } match &self.stage { @@ -922,6 +938,7 @@ impl CatalogChunk { #[cfg(test)] mod tests { use super::*; + use datafusion::logical_plan::{col, lit}; use entry::test_helpers::lp_to_entry; use mutable_buffer::chunk::ChunkMetrics as MBChunkMetrics; use parquet_file::{ @@ -930,6 +947,7 @@ mod tests { make_chunk as make_parquet_chunk_with_store, make_iox_object_store, TestSize, }, }; + use query::predicate::PredicateBuilder; #[test] fn test_new_open() { @@ -942,17 +960,18 @@ mod tests { let mut chunk = make_open_chunk(); // close it - chunk.freeze().unwrap(); + let delete_pred = PredicateBuilder::default().build(); + chunk.freeze(&delete_pred).unwrap(); assert!(matches!(chunk.stage(), &ChunkStage::Frozen { .. })); // closing a second time is a no-op - chunk.freeze().unwrap(); + chunk.freeze(&delete_pred).unwrap(); assert!(matches!(chunk.stage(), &ChunkStage::Frozen { .. })); // closing a chunk in persisted state will fail let mut chunk = make_persisted_chunk().await; assert_eq!( - chunk.freeze().unwrap_err().to_string(), + chunk.freeze(&delete_pred).unwrap_err().to_string(), "Internal Error: unexpected chunk state for Chunk('db':'table1':'part1':0) \ during setting closed. Expected Open or Frozen, got Persisted" ); @@ -1086,6 +1105,75 @@ mod tests { chunk.clear_lifecycle_action().unwrap(); } + #[test] + fn test_add_delete_predicate_open_chunk() { + let mut chunk = make_open_chunk(); + let registration = TaskRegistration::new(); + + // no delete_predicate yet + let del_preds = chunk.delete_predicates(); + assert_eq!(del_preds.len(), 0); + + // Build delete predicate and expected output + let expr1 = col("city").eq(lit("Boston")); + let del_pred1 = PredicateBuilder::new() + .table("test") + .timestamp_range(1, 100) + .add_expr(expr1) + .build(); + let mut expected_exprs1 = vec![]; + let e = col("city").eq(lit("Boston")); + expected_exprs1.push(e); + + // Add a delete predicate into a chunk the open chunk = delete simulation for open chunk + chunk.add_delete_predicate(&del_pred1).unwrap(); + // chunk must be in frozen stage now + assert_eq!(chunk.stage().name(), "Frozen"); + // chunk must have a delete predicate + let del_preds = chunk.delete_predicates(); + assert_eq!(del_preds.len(), 1); + // verify delete predicate value + let pred = &del_preds[0]; + if let Some(range) = pred.range { + assert_eq!(range.start, 1); // start time + assert_eq!(range.end, 100); // start time + } else { + panic!("No time range set for delete predicate"); + } + assert_eq!(pred.exprs, expected_exprs1); + + // Move the chunk + chunk.set_moving(®istration).unwrap(); + // The chunk still should be frozen + assert_eq!(chunk.stage().name(), "Frozen"); + // let add more delete predicate = simulate second delete + // Build delete predicate and expected output + let expr2 = col("cost").not_eq(lit(15)); + let del_pred2 = PredicateBuilder::new() + .table("test") + .timestamp_range(20, 50) + .add_expr(expr2) + .build(); + let mut expected_exprs2 = vec![]; + let e = col("cost").not_eq(lit(15)); + expected_exprs2.push(e); + chunk.add_delete_predicate(&del_pred2).unwrap(); + // chunk still must be in frozen stage now + assert_eq!(chunk.stage().name(), "Frozen"); + // chunk must have 2 delete predicates + let del_preds = chunk.delete_predicates(); + assert_eq!(del_preds.len(), 2); + // verify the second delete predicate value + let pred = &del_preds[1]; + if let Some(range) = pred.range { + assert_eq!(range.start, 20); // start time + assert_eq!(range.end, 50); // start time + } else { + panic!("No time range set for delete predicate"); + } + assert_eq!(pred.exprs, expected_exprs2); + } + fn make_mb_chunk(table_name: &str) -> MBChunk { let entry = lp_to_entry(&format!("{} bar=1 10", table_name)); let write = entry.partition_writes().unwrap().remove(0); @@ -1134,6 +1222,7 @@ mod tests { now, now, ChunkMetrics::new_unregistered(), + Arc::new(vec![] as Vec), ) } } diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index 1afffd70ff..5d40aa5945 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -12,6 +12,7 @@ use observability_deps::tracing::info; use persistence_windows::{ min_max_sequence::OptionalMinMaxSequence, persistence_windows::PersistenceWindows, }; +use query::predicate::Predicate; use snafu::Snafu; use std::{ collections::{btree_map::Entry, BTreeMap}, @@ -160,6 +161,7 @@ impl Partition { time_of_first_write: DateTime, time_of_last_write: DateTime, schema: Arc, + delete_predicates: Arc>, ) -> Arc> { let chunk_id = self.next_chunk_id; assert_ne!(self.next_chunk_id, u32::MAX, "Chunk ID Overflow"); @@ -175,6 +177,7 @@ impl Partition { time_of_last_write, schema, self.metrics.new_chunk_metrics(), + delete_predicates, ))); if self.chunks.insert(chunk_id, Arc::clone(&chunk)).is_some() { @@ -195,6 +198,7 @@ impl Partition { chunk: Arc, time_of_first_write: DateTime, time_of_last_write: DateTime, + delete_predicates: Arc>, ) -> Arc> { assert_eq!(chunk.table_name(), self.table_name()); @@ -208,6 +212,7 @@ impl Partition { time_of_first_write, time_of_last_write, self.metrics.new_chunk_metrics(), + Arc::clone(&delete_predicates), )), ); @@ -359,6 +364,7 @@ mod tests { let t = Utc::now(); let schema = SchemaBuilder::new().timestamp().build().unwrap(); let schema = Arc::new(schema); + let delete_predicates: Arc> = Arc::new(vec![]); let rb = RecordBatch::try_new( schema.as_arrow(), vec![Arc::new(TimestampNanosecondArray::from_iter_values([ @@ -374,18 +380,21 @@ mod tests { t, t, Arc::clone(&schema), + Arc::clone(&delete_predicates), ); partition.create_rub_chunk( RBChunk::new("t", rb.clone(), ChunkMetrics::new(&domain)), t, t, Arc::clone(&schema), + Arc::clone(&delete_predicates), ); partition.create_rub_chunk( RBChunk::new("t", rb, ChunkMetrics::new(&domain)), t, t, Arc::clone(&schema), + Arc::clone(&delete_predicates), ); // should be in ascending order diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 4f6c6be062..5bde87d787 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -314,6 +314,7 @@ impl QueryChunk for DbChunk { Ok(pred_result) } + // NGA todo: add delete predicate here to eliminate data at query time fn read_filter( &self, predicate: &Predicate, diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index de27c5229a..635e567d02 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -10,7 +10,10 @@ use chrono::{DateTime, Utc}; use data_types::job::Job; use lifecycle::LifecycleWriteGuard; use observability_deps::tracing::info; -use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta}; +use query::{ + compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, predicate::Predicate, + QueryChunkMeta, +}; use std::{future::Future, sync::Arc}; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; @@ -39,6 +42,7 @@ pub(crate) fn compact_chunks( let mut input_rows = 0; let mut time_of_first_write: Option> = None; let mut time_of_last_write: Option> = None; + let mut delete_predicates: Arc> = Arc::new(vec![]); let query_chunks = chunks .into_iter() .map(|mut chunk| { @@ -58,6 +62,8 @@ pub(crate) fn compact_chunks( .map(|prev_last| prev_last.max(candidate_last)) .or(Some(candidate_last)); + delete_predicates = chunk.delete_predicates(); + chunk.set_compacting(®istration)?; Ok(DbChunk::snapshot(&*chunk)) }) @@ -100,7 +106,13 @@ pub(crate) fn compact_chunks( for id in chunk_ids { partition.force_drop_chunk(id) } - partition.create_rub_chunk(rb_chunk, time_of_first_write, time_of_last_write, schema) + partition.create_rub_chunk( + rb_chunk, + time_of_first_write, + time_of_last_write, + schema, + delete_predicates, + ) }; let guard = new_chunk.read(); diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index 120348326d..415d5bce81 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -11,7 +11,10 @@ use data_types::job::Job; use lifecycle::{LifecycleWriteGuard, LockableChunk, LockablePartition}; use observability_deps::tracing::info; use persistence_windows::persistence_windows::FlushHandle; -use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta}; +use query::{ + compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, predicate::Predicate, + QueryChunkMeta, +}; use std::{future::Future, sync::Arc}; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; @@ -47,6 +50,7 @@ pub fn persist_chunks( let mut time_of_first_write: Option> = None; let mut time_of_last_write: Option> = None; let mut query_chunks = vec![]; + let mut delete_predicates: Arc> = Arc::new(vec![]); for mut chunk in chunks { // Sanity-check assert!(Arc::ptr_eq(&db, &chunk.data().db)); @@ -64,6 +68,8 @@ pub fn persist_chunks( .map(|prev_last| prev_last.max(candidate_last)) .or(Some(candidate_last)); + delete_predicates = chunk.delete_predicates(); + chunk.set_writing_to_object_store(®istration)?; query_chunks.push(DbChunk::snapshot(&*chunk)); } @@ -124,6 +130,7 @@ pub fn persist_chunks( time_of_first_write, time_of_last_write, Arc::clone(&schema), + Arc::clone(&delete_predicates), ); } @@ -136,6 +143,7 @@ pub fn persist_chunks( time_of_first_write, time_of_last_write, schema, + Arc::clone(&delete_predicates), ), }; let to_persist = to_persist.write(); diff --git a/server/src/db/load.rs b/server/src/db/load.rs index 5bd67fd0a3..fc20b3324c 100644 --- a/server/src/db/load.rs +++ b/server/src/db/load.rs @@ -11,6 +11,7 @@ use parquet_file::{ chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk}, }; use persistence_windows::checkpoint::{ReplayPlan, ReplayPlanner}; +use query::predicate::Predicate; use snafu::{ResultExt, Snafu}; use std::sync::Arc; @@ -272,11 +273,14 @@ impl CatalogState for Loader { let schema_handle = TableSchemaUpsertHandle::new(&table_schema, &parquet_chunk.schema()) .map_err(|e| Box::new(e) as _) .context(SchemaError { path: info.path })?; + + let delete_predicates: Arc> = Arc::new(vec![]); // NGA todo: After Marco save delete predicate into the catalog, it will need to extract into this variable partition.insert_object_store_only_chunk( iox_md.chunk_id, parquet_chunk, iox_md.time_of_first_write, iox_md.time_of_last_write, + delete_predicates, ); schema_handle.commit(); diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 0681596067..622af0da37 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -1489,6 +1489,7 @@ async fn test_delete() { .await .unwrap(); // todo: The delete function above just parses the input, nothing deleted in DB yet + // todoL: should add different tests for different stages of chunks, too // query to verify data deleted // todo: when the delete is done and integrated, the below test must fail