test: more tests to discover what still need to be done

pull/24376/head
Nga Tran 2021-09-14 17:57:30 -04:00
parent f4f140d3b7
commit 63cc7b3fb0
10 changed files with 173 additions and 44 deletions

View File

@ -167,7 +167,9 @@ impl Predicate {
pub fn add_delete_ranges(&mut self, delete_predicates: &[Self]) { pub fn add_delete_ranges(&mut self, delete_predicates: &[Self]) {
for pred in delete_predicates { for pred in delete_predicates {
if let Some(range) = pred.range { if let Some(range) = pred.range {
let expr = col(TIME_COLUMN_NAME).lt(lit(range.start)).or(col(TIME_COLUMN_NAME).gt(lit(range.end))); let expr = col(TIME_COLUMN_NAME)
.lt(lit(range.start))
.or(col(TIME_COLUMN_NAME).gt(lit(range.end)));
self.exprs.push(expr); self.exprs.push(expr);
} }
} }
@ -175,14 +177,12 @@ impl Predicate {
/// Add a list of not(a delete expr) /// Add a list of not(a delete expr)
pub fn add_delete_exprs(&mut self, delete_predicates: &[Self]) { pub fn add_delete_exprs(&mut self, delete_predicates: &[Self]) {
for pred in delete_predicates { for pred in delete_predicates {
for exp in &pred.exprs { for exp in &pred.exprs {
self.exprs.push(exp.clone().not()); self.exprs.push(exp.clone().not());
} }
} }
} }
} }
impl fmt::Display for Predicate { impl fmt::Display for Predicate {

View File

@ -137,7 +137,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
&self, &self,
predicate: &Predicate, predicate: &Predicate,
selection: Selection<'_>, selection: Selection<'_>,
delete_predicates: &Vec<Predicate>, delete_predicates: &[Predicate],
) -> Result<SendableRecordBatchStream, Self::Error>; ) -> Result<SendableRecordBatchStream, Self::Error>;
/// Returns true if data of this chunk is sorted /// Returns true if data of this chunk is sorted

View File

@ -116,9 +116,11 @@ impl<C: QueryChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
let selection_cols = restrict_selection(selection_cols, &chunk_table_schema); let selection_cols = restrict_selection(selection_cols, &chunk_table_schema);
let selection = Selection::Some(&selection_cols); let selection = Selection::Some(&selection_cols);
let del_preds= chunk.delete_predicates(); let del_preds = chunk.delete_predicates();
let stream = chunk.read_filter(&self.predicate, selection, del_preds).map_err(|e| { let stream = chunk
.read_filter(&self.predicate, selection, del_preds)
.map_err(|e| {
DataFusionError::Execution(format!( DataFusionError::Execution(format!(
"Error creating scan for table {} chunk {}: {}", "Error creating scan for table {} chunk {}: {}",
self.table_name, self.table_name,

View File

@ -822,7 +822,7 @@ impl QueryChunk for TestChunk {
&self, &self,
predicate: &Predicate, predicate: &Predicate,
_selection: Selection<'_>, _selection: Selection<'_>,
_delete_predicates: &Vec<Predicate>, _delete_predicates: &[Predicate],
) -> Result<SendableRecordBatchStream, Self::Error> { ) -> Result<SendableRecordBatchStream, Self::Error> {
self.check_error()?; self.check_error()?;
@ -923,7 +923,7 @@ pub async fn raw_data(chunks: &[Arc<TestChunk>]) -> Vec<RecordBatch> {
for c in chunks { for c in chunks {
let pred = Predicate::default(); let pred = Predicate::default();
let selection = Selection::All; let selection = Selection::All;
let delete_predicates: Vec<Predicate> = vec!{}; let delete_predicates: Vec<Predicate> = vec![];
let mut stream = c let mut stream = c
.read_filter(&pred, selection, &delete_predicates) .read_filter(&pred, selection, &delete_predicates)
.expect("Error in read_filter"); .expect("Error in read_filter");

View File

@ -1285,7 +1285,9 @@ mod tests {
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, TimeZone}; use chrono::{DateTime, TimeZone};
use datafusion::logical_plan::{col, lit};
use futures::{stream, StreamExt, TryStreamExt}; use futures::{stream, StreamExt, TryStreamExt};
use predicate::predicate::PredicateBuilder;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use ::test_helpers::assert_contains; use ::test_helpers::assert_contains;
@ -1920,7 +1922,7 @@ mod tests {
async fn collect_read_filter(chunk: &DbChunk) -> Vec<RecordBatch> { async fn collect_read_filter(chunk: &DbChunk) -> Vec<RecordBatch> {
chunk chunk
.read_filter(&Default::default(), Selection::All, &vec![]) .read_filter(&Default::default(), Selection::All, &[])
.unwrap() .unwrap()
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await .await
@ -3782,4 +3784,111 @@ mod tests {
.await .await
.unwrap(); .unwrap();
} }
#[tokio::test]
async fn test_delete_open_mub() {
::test_helpers::maybe_start_logging();
let db = make_db().await.db;
// load a line at time 10 nanosecond which belong to partition starting with 0 nanosecond or "1970-01-01T00"
write_lp(db.as_ref(), "cpu bar=1 10").await;
write_lp(db.as_ref(), "cpu bar=2 20").await;
let partition_key = "1970-01-01T00";
assert_eq!(vec![partition_key], db.partition_keys().unwrap());
// ----
// There is one open mub chunk
// Let query it
let expected = vec![
"+-----+--------------------------------+",
"| bar | time |",
"+-----+--------------------------------+",
"| 1 | 1970-01-01T00:00:00.000000010Z |",
"| 2 | 1970-01-01T00:00:00.000000020Z |",
"+-----+--------------------------------+",
];
let batches = run_query(Arc::clone(&db), "select * from cpu").await;
assert_batches_sorted_eq!(expected, &batches);
// ---
// Delete that row
let i: f64 = 1.0;
let expr = col("bar").eq(lit(i));
let pred = PredicateBuilder::new()
.table("cpu")
.timestamp_range(0, 15)
.add_expr(expr)
.build();
db.delete("cpu", &pred).await.unwrap();
// When the above delete is issued, the open mub chunk is frozen with the delete predicate added
// Let query again
// NGA todo: since MUB does not filtering data, the filtering needs to be done at scan step
let batches = run_query(Arc::clone(&db), "select * from cpu").await;
assert_batches_sorted_eq!(expected, &batches); // this fails when the todo is done
// --
// Since chunk was frozen when the delete is issue, nothing will be frozen futher
let mb_open_chunk = db.rollover_partition("cpu", partition_key).await.unwrap();
assert!(mb_open_chunk.is_none());
// ---
// let move MUB to RUB. The delete predicate will be included in RUB
let mb_chunk_id = 0;
let _mb_chunk = db.chunk("cpu", partition_key, mb_chunk_id).unwrap();
let _rb_chunk = db
.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk_id)
.await
.unwrap();
// Let query again and since RUB supports delete predicate, we should see only 1 row
let batches = run_query(Arc::clone(&db), "select * from cpu").await;
let expected = vec![
"+-----+--------------------------------+",
"| bar | time |",
"+-----+--------------------------------+",
"| 2 | 1970-01-01T00:00:00.000000020Z |",
"+-----+--------------------------------+",
];
assert_batches_sorted_eq!(expected, &batches);
// ---
// Write the RB chunk to Object Store but keep it in RB
let pq_chunk = db
.persist_partition(
"cpu",
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
let pq_chunk_id = pq_chunk.id();
// we should have chunks in both the read buffer and pq
assert!(mutable_chunk_ids(&db, partition_key).is_empty());
assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![pq_chunk_id]);
assert_eq!(
read_parquet_file_chunk_ids(&db, partition_key),
vec![pq_chunk_id]
);
// Let query again from RUB and should see only 1 row
let batches = run_query(Arc::clone(&db), "select * from cpu").await;
assert_batches_sorted_eq!(expected, &batches);
// ---
// Unload RB chunk but keep it in OS
let _pq_chunk = db
.unload_read_buffer("cpu", partition_key, pq_chunk_id)
.unwrap();
// we should only have chunk in os
assert!(mutable_chunk_ids(&db, partition_key).is_empty());
assert!(read_buffer_chunk_ids(&db, partition_key).is_empty());
assert_eq!(
read_parquet_file_chunk_ids(&db, partition_key),
vec![pq_chunk_id]
);
// Let query again but this time from OS
let batches = run_query(Arc::clone(&db), "select * from cpu").await;
assert_batches_sorted_eq!(expected, &batches);
}
} }

View File

@ -5,7 +5,11 @@ use chrono::{DateTime, Utc};
use data_types::{chunk_metadata::ChunkOrder, partition_metadata}; use data_types::{chunk_metadata::ChunkOrder, partition_metadata};
use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream; use datafusion_util::MemoryStream;
use internal_types::{access::AccessRecorder, schema::{Schema, sort::SortKey}, selection::Selection}; use internal_types::{
access::AccessRecorder,
schema::{sort::SortKey, Schema},
selection::Selection,
};
use iox_object_store::ParquetFilePath; use iox_object_store::ParquetFilePath;
use mutable_buffer::chunk::snapshot::ChunkSnapshot; use mutable_buffer::chunk::snapshot::ChunkSnapshot;
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
@ -221,13 +225,16 @@ impl DbChunk {
self.time_of_last_write self.time_of_last_write
} }
pub fn to_rub_negated_predicates(delete_predicates: &Vec<Predicate>) -> Result<Vec<read_buffer::Predicate>> { pub fn to_rub_negated_predicates(
delete_predicates: &[Predicate],
) -> Result<Vec<read_buffer::Predicate>> {
let mut rub_preds: Vec<read_buffer::Predicate> = vec![]; let mut rub_preds: Vec<read_buffer::Predicate> = vec![];
for pred in delete_predicates { for pred in delete_predicates {
let rub_pred = to_read_buffer_predicate(pred).context(PredicateConversion)?; let rub_pred = to_read_buffer_predicate(pred).context(PredicateConversion)?;
rub_preds.push(rub_pred); rub_preds.push(rub_pred);
} }
debug!(?rub_preds, "RUB delete predicates");
Ok(rub_preds) Ok(rub_preds)
} }
} }
@ -324,7 +331,7 @@ impl QueryChunk for DbChunk {
&self, &self,
predicate: &Predicate, predicate: &Predicate,
selection: Selection<'_>, selection: Selection<'_>,
delete_predicates: &Vec<Predicate>, delete_predicates: &[Predicate],
) -> Result<SendableRecordBatchStream, Self::Error> { ) -> Result<SendableRecordBatchStream, Self::Error> {
// Predicate is not required to be applied for correctness. We only pushed it down // Predicate is not required to be applied for correctness. We only pushed it down
// when possible for performance gain // when possible for performance gain
@ -332,15 +339,23 @@ impl QueryChunk for DbChunk {
debug!(?predicate, "Input Predicate to read_filter"); debug!(?predicate, "Input Predicate to read_filter");
self.access_recorder.record_access_now(); self.access_recorder.record_access_now();
debug!(?delete_predicates, "Input Delete Predicates to read_filter");
// add negated deleted ranges to the predicate // add negated deleted ranges to the predicate
let mut pred_with_deleted_ranges = predicate.clone(); let mut pred_with_deleted_ranges = predicate.clone();
pred_with_deleted_ranges.add_delete_ranges(delete_predicates); pred_with_deleted_ranges.add_delete_ranges(delete_predicates);
debug!(?pred_with_deleted_ranges, "Input Predicate plus deleted ranges"); debug!(
?pred_with_deleted_ranges,
"Input Predicate plus deleted ranges"
);
// add negated deleted predicates // add negated deleted predicates
let mut pred_wth_deleted_exprs = pred_with_deleted_ranges.clone(); let mut pred_wth_deleted_exprs = pred_with_deleted_ranges.clone();
pred_wth_deleted_exprs.add_delete_exprs(delete_predicates); pred_wth_deleted_exprs.add_delete_exprs(delete_predicates);
debug!(?pred_wth_deleted_exprs, "Input Predicate plus deleted ranges and deleted predicates"); debug!(
?pred_wth_deleted_exprs,
"Input Predicate plus deleted ranges and deleted predicates"
);
match &self.state { match &self.state {
State::MutableBuffer { chunk, .. } => { State::MutableBuffer { chunk, .. } => {
@ -350,8 +365,9 @@ impl QueryChunk for DbChunk {
} }
State::ReadBuffer { chunk, .. } => { State::ReadBuffer { chunk, .. } => {
// Only apply pushdownable predicates // Only apply pushdownable predicates
let rb_predicate = let rb_predicate = match to_read_buffer_predicate(&pred_with_deleted_ranges)
match to_read_buffer_predicate(&pred_with_deleted_ranges).context(PredicateConversion) { .context(PredicateConversion)
{
Ok(predicate) => predicate, Ok(predicate) => predicate,
Err(_) => read_buffer::Predicate::default(), Err(_) => read_buffer::Predicate::default(),
}; };
@ -374,13 +390,11 @@ impl QueryChunk for DbChunk {
schema.into(), schema.into(),
))) )))
} }
State::ParquetFile { chunk, .. } => { State::ParquetFile { chunk, .. } => chunk
chunk
.read_filter(&pred_wth_deleted_exprs, selection) .read_filter(&pred_wth_deleted_exprs, selection)
.context(ParquetFileChunkError { .context(ParquetFileChunkError {
chunk_id: self.id(), chunk_id: self.id(),
}) }),
}
} }
} }
@ -544,7 +558,7 @@ mod tests {
let t2 = chunk.access_recorder().get_metrics(); let t2 = chunk.access_recorder().get_metrics();
snapshot snapshot
.read_filter(&Default::default(), Selection::All, &vec![]) .read_filter(&Default::default(), Selection::All, &[])
.unwrap(); .unwrap();
let t3 = chunk.access_recorder().get_metrics(); let t3 = chunk.access_recorder().get_metrics();

View File

@ -144,6 +144,8 @@ pub fn persist_chunks(
); );
} }
// NGA todo: we hit this error if there are rows but they are deleted - todo before merge: open a ticket
// Need to think a way to handle this
let to_persist = to_persist.expect("should be rows to persist"); let to_persist = to_persist.expect("should be rows to persist");
let (new_chunk_id, new_chunk) = partition_write.create_rub_chunk( let (new_chunk_id, new_chunk) = partition_write.create_rub_chunk(

View File

@ -87,11 +87,13 @@ pub(super) fn write_chunk_to_object_store(
let (partition_checkpoint, database_checkpoint) = let (partition_checkpoint, database_checkpoint) =
collect_checkpoints(flush_handle.checkpoint(), &db.catalog); collect_checkpoints(flush_handle.checkpoint(), &db.catalog);
// Get RecordBatchStream of data from the read buffer chunk // Get RecordBatchStream of data from the read buffer chunk
let stream = db_chunk let stream = db_chunk
.read_filter(&Default::default(), Selection::All, db_chunk.delete_predicates()) .read_filter(
&Default::default(),
Selection::All,
db_chunk.delete_predicates(),
)
.expect("read filter should be infallible"); .expect("read filter should be infallible");
// check that the upcoming state change will very likely succeed // check that the upcoming state change will very likely succeed

View File

@ -1491,8 +1491,8 @@ async fn test_delete() {
// Delete some data // Delete some data
let table = "cpu"; let table = "cpu";
let start = "0"; let start = "0";
let stop = "12000000000000"; let stop = "1200";
let pred = "region = west"; let pred = r#"region = "west""#;
let _del = management_client let _del = management_client
.delete(db_name.clone(), table, start, stop, pred) .delete(db_name.clone(), table, start, stop, pred)
.await .await