feat: add delete support to row_group:

pull/24376/head
Edd Robinson 2021-08-25 22:57:26 +01:00
parent 95548dcec9
commit dbbfd2a9f8
3 changed files with 324 additions and 34 deletions

View File

@ -156,7 +156,7 @@ impl Chunk {
predicate: Predicate,
select_columns: Selection<'_>,
) -> table::ReadFilterResults {
self.table.read_filter(&select_columns, &predicate)
self.table.read_filter(&select_columns, &predicate, &[])
}
/// Returns an iterable collection of data in group columns and aggregate
@ -949,8 +949,7 @@ mod test {
);
}
#[test]
fn read_filter() {
fn read_filter_setup() -> Chunk {
let mut chunk: Option<Chunk> = None;
// Add a bunch of row groups to a single table in a single chunk
@ -1007,9 +1006,13 @@ mod test {
}
}
}
chunk.unwrap()
}
#[test]
fn read_filter() {
// Chunk should be initialized now.
let chunk = chunk.unwrap();
let chunk = read_filter_setup();
// Build the operation equivalent to the following query:
//
@ -1058,6 +1061,59 @@ mod test {
assert!(itr.next().is_none());
}
#[test]
fn read_filter_with_deletes() {
// Chunk should be initialized now.
let chunk = read_filter_setup();
// Build the operation equivalent to the following query:
//
// SELECT * FROM "table_1" WHERE "env" = 'us-west';
//
// But also assume the following delete has been applied:
//
// DELETE FROM "table_1" WHERE "region" = "west"
//
let predicate = Predicate::new(vec![BinaryExpr::from(("env", "=", "us-west"))]);
let mut itr = chunk.read_filter(predicate, Selection::All);
let exp_env_values = Values::Dictionary(vec![0], vec![Some("us-west")]);
let exp_region_values = Values::Dictionary(vec![0], vec![Some("east")]);
let exp_counter_values = Values::F64(vec![4500.3]);
let exp_sketchy_sensor_values = Values::I64N(vec![Some(44)]);
let exp_active_values = Values::Bool(vec![Some(false)]);
let exp_msg_values = Values::String(vec![None]);
let first_row_group = itr.next().unwrap();
assert_rb_column_equals(&first_row_group, "env", &exp_env_values);
assert_rb_column_equals(&first_row_group, "region", &exp_region_values);
assert_rb_column_equals(&first_row_group, "counter", &exp_counter_values);
assert_rb_column_equals(
&first_row_group,
"sketchy_sensor",
&exp_sketchy_sensor_values,
);
assert_rb_column_equals(&first_row_group, "active", &exp_active_values);
assert_rb_column_equals(&first_row_group, "msg", &exp_msg_values);
assert_rb_column_equals(&first_row_group, "time", &Values::I64(vec![100])); // first row from first record batch
let second_row_group = itr.next().unwrap();
assert_rb_column_equals(&second_row_group, "env", &exp_env_values);
assert_rb_column_equals(&second_row_group, "region", &exp_region_values);
assert_rb_column_equals(&second_row_group, "counter", &exp_counter_values);
assert_rb_column_equals(
&first_row_group,
"sketchy_sensor",
&exp_sketchy_sensor_values,
);
assert_rb_column_equals(&first_row_group, "active", &exp_active_values);
assert_rb_column_equals(&second_row_group, "time", &Values::I64(vec![200])); // first row from second record batch
// No more data
assert!(itr.next().is_none());
}
#[test]
fn could_pass_predicate() {
let chunk = ChunkBuilder::default().build();

View File

@ -259,7 +259,8 @@ impl RowGroup {
pub fn read_filter(
&self,
columns: &[ColumnName<'_>],
predicates: &Predicate,
predicate: &Predicate,
delete_predicates: &[Predicate],
) -> ReadFilterResult<'_> {
let select_columns = self.meta.schema_for_column_names(columns);
assert_eq!(select_columns.len(), columns.len());
@ -269,9 +270,50 @@ impl RowGroup {
..Default::default()
};
// apply predicates to determine candidate rows.
let row_ids = self.row_ids_from_predicate(predicates);
let col_data = self.materialise_rows(&schema, row_ids);
// apply predicate to determine candidate rows.
let row_ids = self.row_ids_from_predicate(predicate);
// identify rows that have been marked as deleted.
let deleted_row_ids = self.row_ids_from_delete_predicates(delete_predicates);
// determine final candidate rows
let final_row_ids = match (dbg!(row_ids), dbg!(deleted_row_ids)) {
// no matching rows
(RowIDsOption::None(_), _) => RowIDsOption::new_none(),
// everything marked deleted
(_, RowIDsOption::All(_)) => RowIDsOption::new_none(),
// nothing to delete
(row_ids, RowIDsOption::None(_)) => row_ids,
// in these cases some rows have been deleted
(RowIDsOption::Some(mut row_ids), RowIDsOption::Some(delete_row_ids)) => {
row_ids.relative_complement(&delete_row_ids);
if row_ids.is_empty() {
RowIDsOption::new_none()
} else {
RowIDsOption::Some(row_ids)
}
}
(RowIDsOption::All(mut row_ids), RowIDsOption::Some(delete_row_ids)) => {
// Recall that the `All` variant for `RowIDsOption` is an
// optimisation to represent all row IDs in the column without
// having to materialise the bitset. In this case however, we
// will have to materialise the bitset in order to calculate the
// relative complement.
row_ids.add_range(0, self.rows());
row_ids.relative_complement(&delete_row_ids);
// N.B we can't remove all rows since there are more selected
// rows than deleted rows - we always end up deleting no rows
// or some rows.
if row_ids.len() == self.rows() as usize {
RowIDsOption::All(row_ids) // all selected rows remain and they're all rows in column
} else {
RowIDsOption::Some(row_ids)
}
}
};
let col_data = self.materialise_rows(&schema, final_row_ids);
ReadFilterResult {
schema,
data: col_data,
@ -416,6 +458,31 @@ impl RowGroup {
)
}
// Determines the set of row ids that satisfy *any* of the provided
// collection of predicates. To be included in the returned set of row IDs
// a row must satisfy all expressions within a single predicate, but need
// not satisfy more than one of the predicates.
fn row_ids_from_delete_predicates(&self, predicates: &[Predicate]) -> RowIDsOption {
if predicates.is_empty() {
return RowIDsOption::new_none();
}
let mut result_row_ids = RowIDs::new_bitmap();
for predicate in predicates {
match self.row_ids_from_predicate(predicate) {
RowIDsOption::None(_) => continue, // no rows match this predicate
RowIDsOption::Some(row_ids) => result_row_ids.union(&row_ids), // add row IDs to final result
RowIDsOption::All(row_ids) => return RowIDsOption::All(row_ids), // everything deleted
}
}
match result_row_ids.len() {
0 => RowIDsOption::None(result_row_ids),
x if x == self.rows() as usize => RowIDsOption::All(result_row_ids),
_ => RowIDsOption::Some(result_row_ids),
}
}
/// Materialises a collection of data in group columns and aggregate
/// columns, optionally filtered by the provided predicate.
///
@ -2457,7 +2524,104 @@ mod test {
}
#[test]
fn read_filter() {
fn row_ids_from_delete_predicates() {
let mut columns = vec![];
let tc = ColumnType::Time(Column::from(&[100_i64, 200, 500, 600, 300, 300][..]));
columns.push(("time".to_string(), tc));
let rc = ColumnType::Tag(Column::from(
&["west", "west", "east", "west", "south", "north"][..],
));
columns.push(("region".to_string(), rc));
let row_group = RowGroup::new(6, columns);
// No predicates means nothing to delete
let row_ids = row_group.row_ids_from_delete_predicates(&[]);
assert!(matches!(row_ids, RowIDsOption::None(_)));
// A predicate that doesn't match anything
let row_ids = row_group.row_ids_from_delete_predicates(&[Predicate::with_time_range(
&[],
1000,
1200,
)]);
assert!(matches!(row_ids, RowIDsOption::None(_)));
// Table looks like:
//
// region | time
// 0 west | 100
// 1 west | 200
// 2 east | 500
// 3 west | 600
// 4 south | 300
// 5 north | 300
// cases that will mark rows deleted
let cases = vec![
// Partially covering "time range" predicate
(
vec![Predicate::with_time_range(&[], 200, 600)],
vec![1, 2, 4, 5],
),
// Partially covering "region" predicate
(
vec![Predicate::new(vec![BinaryExpr::from((
"region", "=", "west",
))])],
vec![0, 1, 3],
),
// Two separate partially covering "region" predicates
(
vec![
Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]),
Predicate::new(vec![BinaryExpr::from(("region", "=", "south"))]),
],
vec![0, 1, 3, 4], // all rows with region=west OR region=south
),
// A partially covering "region" predicates with a conjunctive time expression
// And a separate "region" predicate. This example is equivalent to
// two delete commands like:
//
// DELETE FROM t WHERE region = "west" AND time < 500;
// DELETE FROM t WHERE region = "south";
(
vec![
Predicate::new(vec![
BinaryExpr::from(("region", "=", "west")),
BinaryExpr::from(("time", "<", 500_i64)),
]),
Predicate::new(vec![BinaryExpr::from(("region", "=", "south"))]),
],
vec![0, 1, 4], // any row with (region=west AND time < 500) OR any rows with region = south
),
// a predicate that fully covers the column.
(
vec![Predicate::new(vec![BinaryExpr::from((
"region",
"!=",
"south-west",
))])],
vec![0, 1, 2, 3, 4, 5], // all rows should be marked as deleted
),
// multiple predicates that in union cover the column
(
vec![
Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]),
Predicate::new(vec![BinaryExpr::from(("region", "=", "south"))]),
Predicate::new(vec![BinaryExpr::from(("region", "=", "north"))]),
Predicate::new(vec![BinaryExpr::from(("region", "<", "easter"))]),
],
vec![0, 1, 2, 3, 4, 5], // all rows should be marked as deleted
),
];
for (delete_predicates, exp_row_ids) in cases {
let row_ids = row_group.row_ids_from_delete_predicates(delete_predicates.as_slice());
assert_eq!(row_ids.unwrap().to_vec(), exp_row_ids);
}
}
fn _read_filter_setup() -> RowGroup {
let mut columns = vec![];
let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..]));
columns.push(("time".to_string(), tc));
@ -2475,8 +2639,12 @@ mod test {
let fc = ColumnType::Field(Column::from(&[100_u64, 101, 200, 203, 203, 10][..]));
columns.push(("count".to_string(), fc));
let row_group = RowGroup::new(6, columns);
RowGroup::new(6, columns)
}
#[test]
fn read_filter() {
let row_group = _read_filter_setup();
let cases = vec![
(
vec!["count", "region", "time"],
@ -2533,7 +2701,7 @@ west,4
];
for (cols, predicates, expected) in cases {
let results = row_group.read_filter(&cols, &predicates);
let results = row_group.read_filter(&cols, &predicates, &[]);
assert_eq!(format!("{:?}", &results), expected);
}
@ -2541,29 +2709,14 @@ west,4
let results = row_group.read_filter(
&["method", "region", "time"],
&Predicate::with_time_range(&[], -19, 1),
&[],
);
assert!(results.is_empty());
}
#[test]
fn read_filter_dictionaries() {
let mut columns = vec![];
let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..]));
columns.push(("time".to_string(), tc));
// Tag column that will be dictionary encoded when materialised
let rc = ColumnType::Tag(Column::from(
&["west", "west", "east", "west", "south", "north"][..],
));
columns.push(("region".to_string(), rc));
// Field column that will be stored as a string array when materialised
let mc = ColumnType::Field(Column::from(
&["GET", "POST", "POST", "POST", "PUT", "GET"][..],
));
columns.push(("method".to_string(), mc));
let row_group = RowGroup::new(6, columns);
let row_group = _read_filter_setup();
let cases = vec![
(
@ -2589,7 +2742,7 @@ POST,west,2
];
for (cols, predicates, expected) in cases {
let results = row_group.read_filter(&cols, &predicates);
let results = row_group.read_filter(&cols, &predicates, &[]);
assert_eq!(format!("{:?}", &results), expected);
}
}
@ -2624,10 +2777,78 @@ POST,west,2
let results = row_group.read_filter(
&["state"],
&Predicate::with_time_range(&[BinaryExpr::from(("state", "=", "NY"))], 1, 300),
&[],
);
assert!(results.is_empty());
}
#[test]
fn read_filter_with_deletes() {
let row_group = _read_filter_setup();
// These cases enumerate the possible match states in `read_filter`
let cases = vec![
// select all rows and delete all rows
(
vec!["count", "region", "time"],
Predicate::with_time_range(&[], 1, 7),
vec![Predicate::with_time_range(&[], 0, 10)],
vec!["count,region,time", ""],
),
// select some rows and delete no matching rows
(
vec!["count", "region", "time"],
Predicate::with_time_range(&[], 2, 4),
vec![Predicate::with_time_range(&[], 200, 300)],
vec!["count,region,time", "101,west,2", "200,east,3", ""],
),
// select some rows and delete some others
(
vec!["count", "region", "time"],
Predicate::with_time_range(&[], 1, 6),
vec![Predicate::with_time_range(&[], 2, 4)], // delete rows 101,west,2 and 200,east,3
vec![
"count,region,time",
"100,west,1",
"203,west,4",
"203,south,5",
"",
],
),
// select some rows and delete some others that are not selected
(
vec!["count", "region", "time"],
Predicate::with_time_range(&[], 3, 6),
vec![Predicate::with_time_range(&[], 0, 2)], // delete rows that are not in the result set
vec![
"count,region,time",
"200,east,3",
"203,west,4",
"203,south,5",
"",
],
),
// select all rows and delete some others
(
vec!["count", "region", "time"],
Predicate::with_time_range(&[], 1, 7),
vec![Predicate::with_time_range(&[], 2, 4)], // delete rows 101,west,2 and 200,east,3
vec![
"count,region,time",
"100,west,1",
"203,west,4",
"203,south,5",
"10,north,6",
"",
],
),
];
for (cols, predicate, delete_predicates, expected) in cases {
let results = row_group.read_filter(&cols, &predicate, delete_predicates.as_slice());
assert_eq!(format!("{:?}", &results), expected.join("\n"));
}
}
#[test]
fn read_aggregate() {
let mut columns = vec![];

View File

@ -280,6 +280,7 @@ impl Table {
&'a self,
columns: &Selection<'_>,
predicate: &Predicate,
delete_predicates: &[Predicate],
) -> ReadFilterResults {
// identify row groups where time range and predicates match could match
// the predicate. Get a snapshot of those and the meta-data.
@ -296,6 +297,7 @@ impl Table {
// TODO(edd): I think I can remove `predicates` from the results
ReadFilterResults {
predicate: predicate.clone(),
delete_predicates: delete_predicates.to_vec(),
schema,
row_groups,
}
@ -841,9 +843,9 @@ pub struct ReadFilterResults {
// These row groups passed the predicates and need to be queried.
row_groups: Vec<Arc<RowGroup>>,
// TODO(edd): encapsulate this into a single executor function that just
// executes on the next row group.
predicate: Predicate,
delete_predicates: Vec<Predicate>,
}
impl ReadFilterResults {
@ -872,7 +874,13 @@ impl ReadFilterResults {
self.row_groups
.iter()
.map(|row_group| row_group.read_filter(select_columns, &self.predicate))
.map(|row_group| {
row_group.read_filter(
select_columns,
&self.predicate,
self.delete_predicates.as_slice(),
)
})
.filter(|result| !result.is_empty())
.collect()
}
@ -894,6 +902,7 @@ impl Iterator for ReadFilterResults {
.map(|name| name.as_str())
.collect::<Vec<_>>(),
&self.predicate,
&self.delete_predicates,
);
if result.is_empty() {
@ -1336,7 +1345,11 @@ mod test {
// Get all the results
let predicate = Predicate::with_time_range(&[], 1, 31);
let results = table.read_filter(&Selection::Some(&["time", "count", "region"]), &predicate);
let results = table.read_filter(
&Selection::Some(&["time", "count", "region"]),
&predicate,
&[],
);
// check the column types
let exp_schema = ResultSchema {
@ -1382,7 +1395,7 @@ mod test {
Predicate::with_time_range(&[BinaryExpr::from(("region", "!=", "south"))], 1, 25);
// Apply a predicate `WHERE "region" != "south"`
let results = table.read_filter(&Selection::Some(&["time", "region"]), &predicate);
let results = table.read_filter(&Selection::Some(&["time", "region"]), &predicate, &[]);
let exp_schema = ResultSchema {
select_columns: vec![