feat: save delete predicates in chunks

pull/24376/head
Nga Tran 2021-09-10 17:16:18 -04:00
parent fa47fb5582
commit 3798ca09bb
8 changed files with 176 additions and 42 deletions

View File

@ -4,6 +4,7 @@
pub(crate) use crate::db::chunk::DbChunk;
use crate::{
db::{
self,
access::QueryCatalogAccess,
catalog::{
chunk::{CatalogChunk, ChunkStage},
@ -40,7 +41,7 @@ use parquet_file::catalog::{
use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows};
use query::{
exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext},
predicate::Predicate,
predicate::{Predicate, PredicateBuilder},
QueryDatabase,
};
use rand_distr::{Distribution, Poisson};
@ -125,6 +126,9 @@ pub enum Error {
source: CatalogError,
},
#[snafu(display("Internal error while adding predicate predicate to chunk: {}", source))]
AddDeletePredicateError { source: db::catalog::chunk::Error },
#[snafu(display(
"Storing sequenced entry failed with the following error(s), and possibly more: {}",
errors.iter().map(ToString::to_string).collect::<Vec<_>>().join(", ")
@ -551,7 +555,9 @@ impl Db {
info!(%table_name, %partition_key, found_chunk=chunk.is_some(), "rolling over a partition");
if let Some(chunk) = chunk {
let mut chunk = chunk.write();
chunk.freeze().context(FreezingChunk)?;
chunk
.freeze(&PredicateBuilder::default().build())
.context(FreezingChunk)?;
Ok(Some(DbChunk::snapshot(&chunk)))
} else {
@ -653,9 +659,11 @@ impl Db {
let partition = partition.write();
let chunks = partition.chunks();
for chunk in chunks {
// NGA todo: verify where to close MUB before adding the predicate
// save the delete predicate in the chunk
let mut chunk = chunk.write();
chunk.add_delete_predicate(delete_predicate);
chunk
.add_delete_predicate(delete_predicate)
.context(AddDeletePredicateError)?;
}
}
@ -1304,7 +1312,9 @@ impl Db {
let handle_chunk_write = |chunk: &mut CatalogChunk| {
chunk.record_write(time_of_write, &timestamp_summary);
if chunk.storage().0 >= mub_row_threshold.get() {
chunk.freeze().expect("freeze mub chunk");
chunk
.freeze(&PredicateBuilder::default().build())
.expect("freeze mub chunk");
}
};

View File

@ -13,7 +13,7 @@ use internal_types::{access::AccessRecorder, schema::Schema};
use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, MBChunk};
use observability_deps::tracing::debug;
use parquet_file::chunk::ParquetChunk;
use query::predicate::Predicate;
use query::predicate::{Predicate, PredicateBuilder};
use read_buffer::RBChunk;
use snafu::Snafu;
use std::sync::Arc;
@ -295,12 +295,13 @@ impl CatalogChunk {
time_of_last_write: DateTime<Utc>,
schema: Arc<Schema>,
metrics: ChunkMetrics,
delete_predicates: Arc<Vec<Predicate>>,
) -> Self {
let stage = ChunkStage::Frozen {
meta: Arc::new(ChunkMetadata {
table_summary: Arc::new(chunk.table_summary()),
schema,
delete_predicates: Arc::new(vec![]), //NGA todo: consider to use the one of the given chunk if appropriate
delete_predicates: Arc::clone(&delete_predicates),
}),
representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)),
};
@ -327,6 +328,7 @@ impl CatalogChunk {
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
metrics: ChunkMetrics,
delete_predicates: Arc<Vec<Predicate>>,
) -> Self {
assert_eq!(chunk.table_name(), addr.table_name.as_ref());
@ -334,7 +336,7 @@ impl CatalogChunk {
let meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(chunk.table_summary()),
schema: chunk.schema(),
delete_predicates: Arc::new(vec![]), //NGA todo: consider to use the one of the given chunk if appropriate
delete_predicates: Arc::clone(&delete_predicates),
});
let stage = ChunkStage::Persisted {
@ -451,31 +453,45 @@ impl CatalogChunk {
}
}
pub fn add_delete_predicate(&mut self, _delete_predicate: &Predicate) {
pub fn add_delete_predicate(&mut self, delete_predicate: &Predicate) -> Result<()> {
match &mut self.stage {
ChunkStage::Open { mb_chunk: _ } => {
// Freeze/close this chunk and add delete_predicate to its frozen one
self.freeze(delete_predicate)?;
}
ChunkStage::Frozen { meta, .. } => {
// Add the delete_predicate into the chunk's metadata
let mut del_preds: Vec<Predicate> = (*meta.delete_predicates).clone();
del_preds.push(delete_predicate.clone());
*meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary),
schema: Arc::clone(&meta.schema),
delete_predicates: Arc::new(del_preds),
});
}
ChunkStage::Persisted { meta, .. } => {
// Add the delete_predicate into the chunk's metadata
let mut del_preds: Vec<Predicate> = (*meta.delete_predicates).clone();
del_preds.push(delete_predicate.clone());
*meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary),
schema: Arc::clone(&meta.schema),
delete_predicates: Arc::new(del_preds),
});
}
}
Ok(())
}
pub fn delete_predicates(&mut self) -> Arc<Vec<Predicate>> {
match &self.stage {
ChunkStage::Open { mb_chunk: _ } => {
// NGA todo:
// Close the MUB
// Add the delete_predicate to it
}
ChunkStage::Frozen { representation, .. } => match representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(_snapshot) => {
// NGA todo
}
ChunkStageFrozenRepr::ReadBuffer(_rb_chunk) => {
// NGA todo
}
},
ChunkStage::Persisted {
parquet: _,
read_buffer: Some(_read_buffer),
..
} => {
// NGA todo
}
ChunkStage::Persisted { parquet: _, .. } => {
// NGA todo
// no delete predicate for open chunk
Arc::new(vec![])
}
ChunkStage::Frozen { meta, .. } => Arc::clone(&meta.delete_predicates),
ChunkStage::Persisted { meta, .. } => Arc::clone(&meta.delete_predicates),
}
}
@ -643,7 +659,7 @@ impl CatalogChunk {
///
/// This only works for chunks in the _open_ stage (chunk is converted) and the _frozen_ stage
/// (no-op) and will fail for other stages.
pub fn freeze(&mut self) -> Result<()> {
pub fn freeze(&mut self, delete_predicate: &Predicate) -> Result<()> {
match &self.stage {
ChunkStage::Open { mb_chunk, .. } => {
debug!(%self.addr, row_count=mb_chunk.rows(), "freezing chunk");
@ -656,7 +672,7 @@ impl CatalogChunk {
let metadata = ChunkMetadata {
table_summary: Arc::new(mb_chunk.table_summary()),
schema: s.full_schema(),
delete_predicates: Arc::new(vec![]), //NGA todo: consider to use the one of the mb_chunk if appropriate
delete_predicates: Arc::new(vec![delete_predicate.clone()]),
};
self.stage = ChunkStage::Frozen {
@ -684,7 +700,7 @@ impl CatalogChunk {
// This ensures the closing logic is consistent but doesn't break code that
// assumes a chunk can be moved from open
if matches!(self.stage, ChunkStage::Open { .. }) {
self.freeze()?;
self.freeze(&PredicateBuilder::default().build())?;
}
match &self.stage {
@ -714,7 +730,7 @@ impl CatalogChunk {
match &self.stage {
ChunkStage::Open { .. } | ChunkStage::Frozen { .. } => {
self.set_lifecycle_action(ChunkLifecycleAction::Compacting, registration)?;
self.freeze()?;
self.freeze(&PredicateBuilder::default().build())?;
Ok(())
}
ChunkStage::Persisted { .. } => {
@ -736,7 +752,7 @@ impl CatalogChunk {
*meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary),
schema,
delete_predicates: Arc::new(vec![]), //NGA todo: consider to use the one of the given chunk if appropriate
delete_predicates: Arc::clone(&meta.delete_predicates),
});
match &representation {
@ -766,7 +782,7 @@ impl CatalogChunk {
// This ensures the closing logic is consistent but doesn't break code that
// assumes a chunk can be moved from open
if matches!(self.stage, ChunkStage::Open { .. }) {
self.freeze()?;
self.freeze(&PredicateBuilder::default().build())?;
}
match &self.stage {
@ -922,6 +938,7 @@ impl CatalogChunk {
#[cfg(test)]
mod tests {
use super::*;
use datafusion::logical_plan::{col, lit};
use entry::test_helpers::lp_to_entry;
use mutable_buffer::chunk::ChunkMetrics as MBChunkMetrics;
use parquet_file::{
@ -930,6 +947,7 @@ mod tests {
make_chunk as make_parquet_chunk_with_store, make_iox_object_store, TestSize,
},
};
use query::predicate::PredicateBuilder;
#[test]
fn test_new_open() {
@ -942,17 +960,18 @@ mod tests {
let mut chunk = make_open_chunk();
// close it
chunk.freeze().unwrap();
let delete_pred = PredicateBuilder::default().build();
chunk.freeze(&delete_pred).unwrap();
assert!(matches!(chunk.stage(), &ChunkStage::Frozen { .. }));
// closing a second time is a no-op
chunk.freeze().unwrap();
chunk.freeze(&delete_pred).unwrap();
assert!(matches!(chunk.stage(), &ChunkStage::Frozen { .. }));
// closing a chunk in persisted state will fail
let mut chunk = make_persisted_chunk().await;
assert_eq!(
chunk.freeze().unwrap_err().to_string(),
chunk.freeze(&delete_pred).unwrap_err().to_string(),
"Internal Error: unexpected chunk state for Chunk('db':'table1':'part1':0) \
during setting closed. Expected Open or Frozen, got Persisted"
);
@ -1086,6 +1105,75 @@ mod tests {
chunk.clear_lifecycle_action().unwrap();
}
#[test]
fn test_add_delete_predicate_open_chunk() {
let mut chunk = make_open_chunk();
let registration = TaskRegistration::new();
// no delete_predicate yet
let del_preds = chunk.delete_predicates();
assert_eq!(del_preds.len(), 0);
// Build delete predicate and expected output
let expr1 = col("city").eq(lit("Boston"));
let del_pred1 = PredicateBuilder::new()
.table("test")
.timestamp_range(1, 100)
.add_expr(expr1)
.build();
let mut expected_exprs1 = vec![];
let e = col("city").eq(lit("Boston"));
expected_exprs1.push(e);
// Add a delete predicate into a chunk the open chunk = delete simulation for open chunk
chunk.add_delete_predicate(&del_pred1).unwrap();
// chunk must be in frozen stage now
assert_eq!(chunk.stage().name(), "Frozen");
// chunk must have a delete predicate
let del_preds = chunk.delete_predicates();
assert_eq!(del_preds.len(), 1);
// verify delete predicate value
let pred = &del_preds[0];
if let Some(range) = pred.range {
assert_eq!(range.start, 1); // start time
assert_eq!(range.end, 100); // start time
} else {
panic!("No time range set for delete predicate");
}
assert_eq!(pred.exprs, expected_exprs1);
// Move the chunk
chunk.set_moving(&registration).unwrap();
// The chunk still should be frozen
assert_eq!(chunk.stage().name(), "Frozen");
// let add more delete predicate = simulate second delete
// Build delete predicate and expected output
let expr2 = col("cost").not_eq(lit(15));
let del_pred2 = PredicateBuilder::new()
.table("test")
.timestamp_range(20, 50)
.add_expr(expr2)
.build();
let mut expected_exprs2 = vec![];
let e = col("cost").not_eq(lit(15));
expected_exprs2.push(e);
chunk.add_delete_predicate(&del_pred2).unwrap();
// chunk still must be in frozen stage now
assert_eq!(chunk.stage().name(), "Frozen");
// chunk must have 2 delete predicates
let del_preds = chunk.delete_predicates();
assert_eq!(del_preds.len(), 2);
// verify the second delete predicate value
let pred = &del_preds[1];
if let Some(range) = pred.range {
assert_eq!(range.start, 20); // start time
assert_eq!(range.end, 50); // start time
} else {
panic!("No time range set for delete predicate");
}
assert_eq!(pred.exprs, expected_exprs2);
}
fn make_mb_chunk(table_name: &str) -> MBChunk {
let entry = lp_to_entry(&format!("{} bar=1 10", table_name));
let write = entry.partition_writes().unwrap().remove(0);
@ -1134,6 +1222,7 @@ mod tests {
now,
now,
ChunkMetrics::new_unregistered(),
Arc::new(vec![] as Vec<Predicate>),
)
}
}

View File

@ -12,6 +12,7 @@ use observability_deps::tracing::info;
use persistence_windows::{
min_max_sequence::OptionalMinMaxSequence, persistence_windows::PersistenceWindows,
};
use query::predicate::Predicate;
use snafu::Snafu;
use std::{
collections::{btree_map::Entry, BTreeMap},
@ -160,6 +161,7 @@ impl Partition {
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
schema: Arc<Schema>,
delete_predicates: Arc<Vec<Predicate>>,
) -> Arc<RwLock<CatalogChunk>> {
let chunk_id = self.next_chunk_id;
assert_ne!(self.next_chunk_id, u32::MAX, "Chunk ID Overflow");
@ -175,6 +177,7 @@ impl Partition {
time_of_last_write,
schema,
self.metrics.new_chunk_metrics(),
delete_predicates,
)));
if self.chunks.insert(chunk_id, Arc::clone(&chunk)).is_some() {
@ -195,6 +198,7 @@ impl Partition {
chunk: Arc<parquet_file::chunk::ParquetChunk>,
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
delete_predicates: Arc<Vec<Predicate>>,
) -> Arc<RwLock<CatalogChunk>> {
assert_eq!(chunk.table_name(), self.table_name());
@ -208,6 +212,7 @@ impl Partition {
time_of_first_write,
time_of_last_write,
self.metrics.new_chunk_metrics(),
Arc::clone(&delete_predicates),
)),
);
@ -359,6 +364,7 @@ mod tests {
let t = Utc::now();
let schema = SchemaBuilder::new().timestamp().build().unwrap();
let schema = Arc::new(schema);
let delete_predicates: Arc<Vec<Predicate>> = Arc::new(vec![]);
let rb = RecordBatch::try_new(
schema.as_arrow(),
vec![Arc::new(TimestampNanosecondArray::from_iter_values([
@ -374,18 +380,21 @@ mod tests {
t,
t,
Arc::clone(&schema),
Arc::clone(&delete_predicates),
);
partition.create_rub_chunk(
RBChunk::new("t", rb.clone(), ChunkMetrics::new(&domain)),
t,
t,
Arc::clone(&schema),
Arc::clone(&delete_predicates),
);
partition.create_rub_chunk(
RBChunk::new("t", rb, ChunkMetrics::new(&domain)),
t,
t,
Arc::clone(&schema),
Arc::clone(&delete_predicates),
);
// should be in ascending order

View File

@ -314,6 +314,7 @@ impl QueryChunk for DbChunk {
Ok(pred_result)
}
// NGA todo: add delete predicate here to eliminate data at query time
fn read_filter(
&self,
predicate: &Predicate,

View File

@ -10,7 +10,10 @@ use chrono::{DateTime, Utc};
use data_types::job::Job;
use lifecycle::LifecycleWriteGuard;
use observability_deps::tracing::info;
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
use query::{
compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, predicate::Predicate,
QueryChunkMeta,
};
use std::{future::Future, sync::Arc};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
@ -39,6 +42,7 @@ pub(crate) fn compact_chunks(
let mut input_rows = 0;
let mut time_of_first_write: Option<DateTime<Utc>> = None;
let mut time_of_last_write: Option<DateTime<Utc>> = None;
let mut delete_predicates: Arc<Vec<Predicate>> = Arc::new(vec![]);
let query_chunks = chunks
.into_iter()
.map(|mut chunk| {
@ -58,6 +62,8 @@ pub(crate) fn compact_chunks(
.map(|prev_last| prev_last.max(candidate_last))
.or(Some(candidate_last));
delete_predicates = chunk.delete_predicates();
chunk.set_compacting(&registration)?;
Ok(DbChunk::snapshot(&*chunk))
})
@ -100,7 +106,13 @@ pub(crate) fn compact_chunks(
for id in chunk_ids {
partition.force_drop_chunk(id)
}
partition.create_rub_chunk(rb_chunk, time_of_first_write, time_of_last_write, schema)
partition.create_rub_chunk(
rb_chunk,
time_of_first_write,
time_of_last_write,
schema,
delete_predicates,
)
};
let guard = new_chunk.read();

View File

@ -11,7 +11,10 @@ use data_types::job::Job;
use lifecycle::{LifecycleWriteGuard, LockableChunk, LockablePartition};
use observability_deps::tracing::info;
use persistence_windows::persistence_windows::FlushHandle;
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
use query::{
compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, predicate::Predicate,
QueryChunkMeta,
};
use std::{future::Future, sync::Arc};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
@ -47,6 +50,7 @@ pub fn persist_chunks(
let mut time_of_first_write: Option<DateTime<Utc>> = None;
let mut time_of_last_write: Option<DateTime<Utc>> = None;
let mut query_chunks = vec![];
let mut delete_predicates: Arc<Vec<Predicate>> = Arc::new(vec![]);
for mut chunk in chunks {
// Sanity-check
assert!(Arc::ptr_eq(&db, &chunk.data().db));
@ -64,6 +68,8 @@ pub fn persist_chunks(
.map(|prev_last| prev_last.max(candidate_last))
.or(Some(candidate_last));
delete_predicates = chunk.delete_predicates();
chunk.set_writing_to_object_store(&registration)?;
query_chunks.push(DbChunk::snapshot(&*chunk));
}
@ -124,6 +130,7 @@ pub fn persist_chunks(
time_of_first_write,
time_of_last_write,
Arc::clone(&schema),
Arc::clone(&delete_predicates),
);
}
@ -136,6 +143,7 @@ pub fn persist_chunks(
time_of_first_write,
time_of_last_write,
schema,
Arc::clone(&delete_predicates),
),
};
let to_persist = to_persist.write();

View File

@ -11,6 +11,7 @@ use parquet_file::{
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
};
use persistence_windows::checkpoint::{ReplayPlan, ReplayPlanner};
use query::predicate::Predicate;
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
@ -272,11 +273,14 @@ impl CatalogState for Loader {
let schema_handle = TableSchemaUpsertHandle::new(&table_schema, &parquet_chunk.schema())
.map_err(|e| Box::new(e) as _)
.context(SchemaError { path: info.path })?;
let delete_predicates: Arc<Vec<Predicate>> = Arc::new(vec![]); // NGA todo: After Marco save delete predicate into the catalog, it will need to extract into this variable
partition.insert_object_store_only_chunk(
iox_md.chunk_id,
parquet_chunk,
iox_md.time_of_first_write,
iox_md.time_of_last_write,
delete_predicates,
);
schema_handle.commit();

View File

@ -1489,6 +1489,7 @@ async fn test_delete() {
.await
.unwrap();
// todo: The delete function above just parses the input, nothing deleted in DB yet
// todoL: should add different tests for different stages of chunks, too
// query to verify data deleted
// todo: when the delete is done and integrated, the below test must fail