diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 484fea648a..3a5db3041f 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -156,7 +156,7 @@ impl Chunk { predicate: Predicate, select_columns: Selection<'_>, ) -> table::ReadFilterResults { - self.table.read_filter(&select_columns, &predicate) + self.table.read_filter(&select_columns, &predicate, &[]) } /// Returns an iterable collection of data in group columns and aggregate @@ -949,8 +949,7 @@ mod test { ); } - #[test] - fn read_filter() { + fn read_filter_setup() -> Chunk { let mut chunk: Option = None; // Add a bunch of row groups to a single table in a single chunk @@ -1007,9 +1006,13 @@ mod test { } } } + chunk.unwrap() + } + #[test] + fn read_filter() { // Chunk should be initialized now. - let chunk = chunk.unwrap(); + let chunk = read_filter_setup(); // Build the operation equivalent to the following query: // @@ -1058,6 +1061,59 @@ mod test { assert!(itr.next().is_none()); } + #[test] + fn read_filter_with_deletes() { + // Chunk should be initialized now. + let chunk = read_filter_setup(); + + // Build the operation equivalent to the following query: + // + // SELECT * FROM "table_1" WHERE "env" = 'us-west'; + // + // But also assume the following delete has been applied: + // + // DELETE FROM "table_1" WHERE "region" = "west" + // + let predicate = Predicate::new(vec![BinaryExpr::from(("env", "=", "us-west"))]); + + let mut itr = chunk.read_filter(predicate, Selection::All); + + let exp_env_values = Values::Dictionary(vec![0], vec![Some("us-west")]); + let exp_region_values = Values::Dictionary(vec![0], vec![Some("east")]); + let exp_counter_values = Values::F64(vec![4500.3]); + let exp_sketchy_sensor_values = Values::I64N(vec![Some(44)]); + let exp_active_values = Values::Bool(vec![Some(false)]); + let exp_msg_values = Values::String(vec![None]); + + let first_row_group = itr.next().unwrap(); + assert_rb_column_equals(&first_row_group, "env", &exp_env_values); + assert_rb_column_equals(&first_row_group, "region", &exp_region_values); + assert_rb_column_equals(&first_row_group, "counter", &exp_counter_values); + assert_rb_column_equals( + &first_row_group, + "sketchy_sensor", + &exp_sketchy_sensor_values, + ); + assert_rb_column_equals(&first_row_group, "active", &exp_active_values); + assert_rb_column_equals(&first_row_group, "msg", &exp_msg_values); + assert_rb_column_equals(&first_row_group, "time", &Values::I64(vec![100])); // first row from first record batch + + let second_row_group = itr.next().unwrap(); + assert_rb_column_equals(&second_row_group, "env", &exp_env_values); + assert_rb_column_equals(&second_row_group, "region", &exp_region_values); + assert_rb_column_equals(&second_row_group, "counter", &exp_counter_values); + assert_rb_column_equals( + &first_row_group, + "sketchy_sensor", + &exp_sketchy_sensor_values, + ); + assert_rb_column_equals(&first_row_group, "active", &exp_active_values); + assert_rb_column_equals(&second_row_group, "time", &Values::I64(vec![200])); // first row from second record batch + + // No more data + assert!(itr.next().is_none()); + } + #[test] fn could_pass_predicate() { let chunk = ChunkBuilder::default().build(); diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 66c6c54dd2..b1397ea53a 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -259,7 +259,8 @@ impl RowGroup { pub fn read_filter( &self, columns: &[ColumnName<'_>], - predicates: &Predicate, + predicate: &Predicate, + delete_predicates: &[Predicate], ) -> ReadFilterResult<'_> { let select_columns = self.meta.schema_for_column_names(columns); assert_eq!(select_columns.len(), columns.len()); @@ -269,9 +270,50 @@ impl RowGroup { ..Default::default() }; - // apply predicates to determine candidate rows. - let row_ids = self.row_ids_from_predicate(predicates); - let col_data = self.materialise_rows(&schema, row_ids); + // apply predicate to determine candidate rows. + let row_ids = self.row_ids_from_predicate(predicate); + + // identify rows that have been marked as deleted. + let deleted_row_ids = self.row_ids_from_delete_predicates(delete_predicates); + + // determine final candidate rows + let final_row_ids = match (dbg!(row_ids), dbg!(deleted_row_ids)) { + // no matching rows + (RowIDsOption::None(_), _) => RowIDsOption::new_none(), + // everything marked deleted + (_, RowIDsOption::All(_)) => RowIDsOption::new_none(), + // nothing to delete + (row_ids, RowIDsOption::None(_)) => row_ids, + // in these cases some rows have been deleted + (RowIDsOption::Some(mut row_ids), RowIDsOption::Some(delete_row_ids)) => { + row_ids.relative_complement(&delete_row_ids); + if row_ids.is_empty() { + RowIDsOption::new_none() + } else { + RowIDsOption::Some(row_ids) + } + } + (RowIDsOption::All(mut row_ids), RowIDsOption::Some(delete_row_ids)) => { + // Recall that the `All` variant for `RowIDsOption` is an + // optimisation to represent all row IDs in the column without + // having to materialise the bitset. In this case however, we + // will have to materialise the bitset in order to calculate the + // relative complement. + row_ids.add_range(0, self.rows()); + row_ids.relative_complement(&delete_row_ids); + + // N.B we can't remove all rows since there are more selected + // rows than deleted rows - we always end up deleting no rows + // or some rows. + if row_ids.len() == self.rows() as usize { + RowIDsOption::All(row_ids) // all selected rows remain and they're all rows in column + } else { + RowIDsOption::Some(row_ids) + } + } + }; + + let col_data = self.materialise_rows(&schema, final_row_ids); ReadFilterResult { schema, data: col_data, @@ -416,6 +458,31 @@ impl RowGroup { ) } + // Determines the set of row ids that satisfy *any* of the provided + // collection of predicates. To be included in the returned set of row IDs + // a row must satisfy all expressions within a single predicate, but need + // not satisfy more than one of the predicates. + fn row_ids_from_delete_predicates(&self, predicates: &[Predicate]) -> RowIDsOption { + if predicates.is_empty() { + return RowIDsOption::new_none(); + } + let mut result_row_ids = RowIDs::new_bitmap(); + + for predicate in predicates { + match self.row_ids_from_predicate(predicate) { + RowIDsOption::None(_) => continue, // no rows match this predicate + RowIDsOption::Some(row_ids) => result_row_ids.union(&row_ids), // add row IDs to final result + RowIDsOption::All(row_ids) => return RowIDsOption::All(row_ids), // everything deleted + } + } + + match result_row_ids.len() { + 0 => RowIDsOption::None(result_row_ids), + x if x == self.rows() as usize => RowIDsOption::All(result_row_ids), + _ => RowIDsOption::Some(result_row_ids), + } + } + /// Materialises a collection of data in group columns and aggregate /// columns, optionally filtered by the provided predicate. /// @@ -2457,7 +2524,104 @@ mod test { } #[test] - fn read_filter() { + fn row_ids_from_delete_predicates() { + let mut columns = vec![]; + let tc = ColumnType::Time(Column::from(&[100_i64, 200, 500, 600, 300, 300][..])); + columns.push(("time".to_string(), tc)); + let rc = ColumnType::Tag(Column::from( + &["west", "west", "east", "west", "south", "north"][..], + )); + columns.push(("region".to_string(), rc)); + let row_group = RowGroup::new(6, columns); + + // No predicates means nothing to delete + let row_ids = row_group.row_ids_from_delete_predicates(&[]); + assert!(matches!(row_ids, RowIDsOption::None(_))); + + // A predicate that doesn't match anything + let row_ids = row_group.row_ids_from_delete_predicates(&[Predicate::with_time_range( + &[], + 1000, + 1200, + )]); + assert!(matches!(row_ids, RowIDsOption::None(_))); + + // Table looks like: + // + // region | time + // 0 west | 100 + // 1 west | 200 + // 2 east | 500 + // 3 west | 600 + // 4 south | 300 + // 5 north | 300 + + // cases that will mark rows deleted + let cases = vec![ + // Partially covering "time range" predicate + ( + vec![Predicate::with_time_range(&[], 200, 600)], + vec![1, 2, 4, 5], + ), + // Partially covering "region" predicate + ( + vec![Predicate::new(vec![BinaryExpr::from(( + "region", "=", "west", + ))])], + vec![0, 1, 3], + ), + // Two separate partially covering "region" predicates + ( + vec![ + Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]), + Predicate::new(vec![BinaryExpr::from(("region", "=", "south"))]), + ], + vec![0, 1, 3, 4], // all rows with region=west OR region=south + ), + // A partially covering "region" predicates with a conjunctive time expression + // And a separate "region" predicate. This example is equivalent to + // two delete commands like: + // + // DELETE FROM t WHERE region = "west" AND time < 500; + // DELETE FROM t WHERE region = "south"; + ( + vec![ + Predicate::new(vec![ + BinaryExpr::from(("region", "=", "west")), + BinaryExpr::from(("time", "<", 500_i64)), + ]), + Predicate::new(vec![BinaryExpr::from(("region", "=", "south"))]), + ], + vec![0, 1, 4], // any row with (region=west AND time < 500) OR any rows with region = south + ), + // a predicate that fully covers the column. + ( + vec![Predicate::new(vec![BinaryExpr::from(( + "region", + "!=", + "south-west", + ))])], + vec![0, 1, 2, 3, 4, 5], // all rows should be marked as deleted + ), + // multiple predicates that in union cover the column + ( + vec![ + Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]), + Predicate::new(vec![BinaryExpr::from(("region", "=", "south"))]), + Predicate::new(vec![BinaryExpr::from(("region", "=", "north"))]), + Predicate::new(vec![BinaryExpr::from(("region", "<", "easter"))]), + ], + vec![0, 1, 2, 3, 4, 5], // all rows should be marked as deleted + ), + ]; + + for (delete_predicates, exp_row_ids) in cases { + let row_ids = row_group.row_ids_from_delete_predicates(delete_predicates.as_slice()); + assert_eq!(row_ids.unwrap().to_vec(), exp_row_ids); + } + } + + fn _read_filter_setup() -> RowGroup { let mut columns = vec![]; let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..])); columns.push(("time".to_string(), tc)); @@ -2475,8 +2639,12 @@ mod test { let fc = ColumnType::Field(Column::from(&[100_u64, 101, 200, 203, 203, 10][..])); columns.push(("count".to_string(), fc)); - let row_group = RowGroup::new(6, columns); + RowGroup::new(6, columns) + } + #[test] + fn read_filter() { + let row_group = _read_filter_setup(); let cases = vec![ ( vec!["count", "region", "time"], @@ -2533,7 +2701,7 @@ west,4 ]; for (cols, predicates, expected) in cases { - let results = row_group.read_filter(&cols, &predicates); + let results = row_group.read_filter(&cols, &predicates, &[]); assert_eq!(format!("{:?}", &results), expected); } @@ -2541,29 +2709,14 @@ west,4 let results = row_group.read_filter( &["method", "region", "time"], &Predicate::with_time_range(&[], -19, 1), + &[], ); assert!(results.is_empty()); } #[test] fn read_filter_dictionaries() { - let mut columns = vec![]; - let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..])); - columns.push(("time".to_string(), tc)); - - // Tag column that will be dictionary encoded when materialised - let rc = ColumnType::Tag(Column::from( - &["west", "west", "east", "west", "south", "north"][..], - )); - columns.push(("region".to_string(), rc)); - - // Field column that will be stored as a string array when materialised - let mc = ColumnType::Field(Column::from( - &["GET", "POST", "POST", "POST", "PUT", "GET"][..], - )); - columns.push(("method".to_string(), mc)); - - let row_group = RowGroup::new(6, columns); + let row_group = _read_filter_setup(); let cases = vec![ ( @@ -2589,7 +2742,7 @@ POST,west,2 ]; for (cols, predicates, expected) in cases { - let results = row_group.read_filter(&cols, &predicates); + let results = row_group.read_filter(&cols, &predicates, &[]); assert_eq!(format!("{:?}", &results), expected); } } @@ -2624,10 +2777,78 @@ POST,west,2 let results = row_group.read_filter( &["state"], &Predicate::with_time_range(&[BinaryExpr::from(("state", "=", "NY"))], 1, 300), + &[], ); assert!(results.is_empty()); } + #[test] + fn read_filter_with_deletes() { + let row_group = _read_filter_setup(); + // These cases enumerate the possible match states in `read_filter` + let cases = vec![ + // select all rows and delete all rows + ( + vec!["count", "region", "time"], + Predicate::with_time_range(&[], 1, 7), + vec![Predicate::with_time_range(&[], 0, 10)], + vec!["count,region,time", ""], + ), + // select some rows and delete no matching rows + ( + vec!["count", "region", "time"], + Predicate::with_time_range(&[], 2, 4), + vec![Predicate::with_time_range(&[], 200, 300)], + vec!["count,region,time", "101,west,2", "200,east,3", ""], + ), + // select some rows and delete some others + ( + vec!["count", "region", "time"], + Predicate::with_time_range(&[], 1, 6), + vec![Predicate::with_time_range(&[], 2, 4)], // delete rows 101,west,2 and 200,east,3 + vec![ + "count,region,time", + "100,west,1", + "203,west,4", + "203,south,5", + "", + ], + ), + // select some rows and delete some others that are not selected + ( + vec!["count", "region", "time"], + Predicate::with_time_range(&[], 3, 6), + vec![Predicate::with_time_range(&[], 0, 2)], // delete rows that are not in the result set + vec![ + "count,region,time", + "200,east,3", + "203,west,4", + "203,south,5", + "", + ], + ), + // select all rows and delete some others + ( + vec!["count", "region", "time"], + Predicate::with_time_range(&[], 1, 7), + vec![Predicate::with_time_range(&[], 2, 4)], // delete rows 101,west,2 and 200,east,3 + vec![ + "count,region,time", + "100,west,1", + "203,west,4", + "203,south,5", + "10,north,6", + "", + ], + ), + ]; + + for (cols, predicate, delete_predicates, expected) in cases { + let results = row_group.read_filter(&cols, &predicate, delete_predicates.as_slice()); + assert_eq!(format!("{:?}", &results), expected.join("\n")); + } + } + #[test] fn read_aggregate() { let mut columns = vec![]; diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 8460d8a880..94663bd99f 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -280,6 +280,7 @@ impl Table { &'a self, columns: &Selection<'_>, predicate: &Predicate, + delete_predicates: &[Predicate], ) -> ReadFilterResults { // identify row groups where time range and predicates match could match // the predicate. Get a snapshot of those and the meta-data. @@ -296,6 +297,7 @@ impl Table { // TODO(edd): I think I can remove `predicates` from the results ReadFilterResults { predicate: predicate.clone(), + delete_predicates: delete_predicates.to_vec(), schema, row_groups, } @@ -841,9 +843,9 @@ pub struct ReadFilterResults { // These row groups passed the predicates and need to be queried. row_groups: Vec>, - // TODO(edd): encapsulate this into a single executor function that just - // executes on the next row group. predicate: Predicate, + + delete_predicates: Vec, } impl ReadFilterResults { @@ -872,7 +874,13 @@ impl ReadFilterResults { self.row_groups .iter() - .map(|row_group| row_group.read_filter(select_columns, &self.predicate)) + .map(|row_group| { + row_group.read_filter( + select_columns, + &self.predicate, + self.delete_predicates.as_slice(), + ) + }) .filter(|result| !result.is_empty()) .collect() } @@ -894,6 +902,7 @@ impl Iterator for ReadFilterResults { .map(|name| name.as_str()) .collect::>(), &self.predicate, + &self.delete_predicates, ); if result.is_empty() { @@ -1336,7 +1345,11 @@ mod test { // Get all the results let predicate = Predicate::with_time_range(&[], 1, 31); - let results = table.read_filter(&Selection::Some(&["time", "count", "region"]), &predicate); + let results = table.read_filter( + &Selection::Some(&["time", "count", "region"]), + &predicate, + &[], + ); // check the column types let exp_schema = ResultSchema { @@ -1382,7 +1395,7 @@ mod test { Predicate::with_time_range(&[BinaryExpr::from(("region", "!=", "south"))], 1, 25); // Apply a predicate `WHERE "region" != "south"` - let results = table.read_filter(&Selection::Some(&["time", "region"]), &predicate); + let results = table.read_filter(&Selection::Some(&["time", "region"]), &predicate, &[]); let exp_schema = ResultSchema { select_columns: vec![