feat: make group keys comparable and results sortable

This commit provides functionality on top of the `GroupKey` type (a
vector of materialised values), which allows them to be comparable by
implementing `Ord`.

Then, using the `permutation` crate, it is possible sort all rows in a
result set based on the group keys, which will be useful for testing.
pull/24376/head
Edd Robinson 2020-12-09 10:06:23 +00:00
parent 595d13956d
commit 0d60102c74
3 changed files with 150 additions and 86 deletions

7
Cargo.lock generated
View File

@ -2129,6 +2129,12 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "permutation"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9978962f8a4b158e97447a6d09d2d75e206d2994eff056c894019f362b27142"
[[package]] [[package]]
name = "petgraph" name = "petgraph"
version = "0.5.1" version = "0.5.1"
@ -2789,6 +2795,7 @@ dependencies = [
"either", "either",
"itertools 0.9.0", "itertools 0.9.0",
"packers", "packers",
"permutation",
"rand", "rand",
"rand_distr", "rand_distr",
] ]

View File

@ -14,6 +14,7 @@ packers = { path = "../packers" }
croaring = "0.4.5" croaring = "0.4.5"
itertools = "0.9.0" itertools = "0.9.0"
either = "1.6.1" either = "1.6.1"
permutation = "0.2.5"
[dev-dependencies] [dev-dependencies]
criterion = "0.3.3" criterion = "0.3.3"

View File

@ -335,81 +335,98 @@ impl Segment {
) )
} }
/// Returns a set of group keys and aggregated column data associated with /// Right now, predicates are treated conjunctive (AND) predicates. `read_group`
/// them. /// does not guarantee any sort order. Ordering of results should be handled
/// /// high up in the `Table` section of the segment store, where multiple
/// Right now, predicates are conjunctive (AND). /// segment results may need to be merged.
pub fn read_group( pub fn read_group(
&self, &self,
predicates: &[Predicate<'_>], predicates: &[Predicate<'_>],
group_columns: &[ColumnName<'_>], group_columns: &[ColumnName<'_>],
aggregates: &[(ColumnName<'_>, AggregateType)], aggregates: &[(ColumnName<'_>, AggregateType)],
) -> ReadGroupResult<'_> { ) -> ReadGroupResult<'_> {
// `ReadGroupResult`s should have the same lifetime as self. Alternatively
// ReadGroupResult could not store references to input data and put the
// responsibility on the caller to tie result data and input data
// together, but the convenience seems useful for now.
let mut result = ReadGroupResult {
group_columns: group_columns
.iter()
.map(|name| {
let (column_name, col) = self.column_name_and_column(name);
column_name
})
.collect::<Vec<_>>(),
aggregate_columns: aggregates
.iter()
.map(|(name, typ)| {
let (column_name, col) = self.column_name_and_column(name);
(column_name, *typ)
})
.collect::<Vec<_>>(),
..ReadGroupResult::default()
};
// Handle case where there are no predicates and all the columns being // Handle case where there are no predicates and all the columns being
// grouped support constant-time expression of the row_ids belonging to // grouped support constant-time expression of the row_ids belonging to
// each grouped value. // each grouped value.
let all_group_cols_pre_computed = group_columns.iter().all(|col_name| { let all_group_cols_pre_computed = result.group_columns.iter().all(|name| {
self.column_by_name(col_name) self.column_by_name(name)
.properties() .properties()
.has_pre_computed_row_ids .has_pre_computed_row_ids
}); });
if predicates.is_empty() && all_group_cols_pre_computed { if predicates.is_empty() && all_group_cols_pre_computed {
return self.read_group_all_rows_all_rle(group_columns, aggregates); self.read_group_all_rows_all_rle(&mut result);
return result;
} }
// If there is a single group column then we can use an optimised // If there is a single group column then we can use an optimised
// approach for building group keys // approach for building group keys
if group_columns.len() == 1 { if group_columns.len() == 1 {
return self.read_group_single_group_column(predicates, group_columns[0], aggregates); self.read_group_single_group_column(predicates, &mut result);
return result;
} }
// Perform the group by using a hashmap // Perform the group by using a hashmap
self.read_group_hash(predicates, group_columns, aggregates) self.read_group_hash(predicates, &mut result);
result
} }
fn read_group_hash( fn read_group_hash<'a>(&'a self, predicates: &[Predicate<'_>], dst: &mut ReadGroupResult<'a>) {
&self,
predicates: &[Predicate<'_>],
group_columns: &[ColumnName<'_>],
aggregates: &[(ColumnName<'_>, AggregateType)],
) -> ReadGroupResult<'_> {
let row_ids = self.row_ids_from_predicates(predicates); let row_ids = self.row_ids_from_predicates(predicates);
let filter_row_ids = match row_ids { let filter_row_ids = match row_ids {
RowIDsOption::None(_) => { RowIDsOption::None(_) => return, // no matching rows
return ReadGroupResult {
group_columns,
aggregate_columns: aggregates,
..ReadGroupResult::default()
}
} // no matching rows
RowIDsOption::Some(row_ids) => Some(row_ids.to_vec()), RowIDsOption::Some(row_ids) => Some(row_ids.to_vec()),
RowIDsOption::All(row_ids) => None, RowIDsOption::All(row_ids) => None,
}; };
let group_cols_num = dst.group_columns.len();
let agg_cols_num = dst.aggregate_columns.len();
// materialise all *encoded* values for each column we are grouping on. // materialise all *encoded* values for each column we are grouping on.
// These will not be the logical (typically string) values, but will be // These will not be the logical (typically string) values, but will be
// vectors of integers representing the physical values. // vectors of integers representing the physical values.
let mut groupby_encoded_ids = Vec::with_capacity(group_columns.len()); let mut groupby_encoded_ids = Vec::with_capacity(group_cols_num);
for name in group_columns { for name in &dst.group_columns {
let col = self.column_by_name(name); let col = self.column_by_name(name);
let mut dst_buf = EncodedValues::with_capacity_u32(col.num_rows() as usize); let mut encoded_values_buf = EncodedValues::with_capacity_u32(col.num_rows() as usize);
// do we want some rows for the column or all of them? // do we want some rows for the column or all of them?
match &filter_row_ids { match &filter_row_ids {
Some(row_ids) => { Some(row_ids) => {
dst_buf = col.encoded_values(row_ids, dst_buf); encoded_values_buf = col.encoded_values(row_ids, encoded_values_buf);
} }
None => { None => {
// None implies "no partial set of row ids" meaning get all of them. // None implies "no partial set of row ids" meaning get all of them.
dst_buf = col.all_encoded_values(dst_buf); encoded_values_buf = col.all_encoded_values(encoded_values_buf);
} }
} }
groupby_encoded_ids.push(dst_buf); groupby_encoded_ids.push(encoded_values_buf);
} }
// Materialise decoded values in aggregate columns. // Materialise decoded values in aggregate columns.
let mut aggregate_columns_data = Vec::with_capacity(aggregates.len()); let mut aggregate_columns_data = Vec::with_capacity(agg_cols_num);
for (name, agg_type) in aggregates { for (name, agg_type) in &dst.aggregate_columns {
let col = self.column_by_name(name); let col = self.column_by_name(name);
// TODO(edd): this materialises a column per aggregate. If there are // TODO(edd): this materialises a column per aggregate. If there are
@ -432,7 +449,7 @@ impl Segment {
// key_buf will be used as a temporary buffer for group keys, which are // key_buf will be used as a temporary buffer for group keys, which are
// themselves integers. // themselves integers.
let mut key_buf = Vec::with_capacity(group_columns.len()); let mut key_buf = Vec::with_capacity(group_cols_num);
key_buf.resize(key_buf.capacity(), 0); key_buf.resize(key_buf.capacity(), 0);
for row in 0..groupby_encoded_ids[0].len() { for row in 0..groupby_encoded_ids[0].len() {
@ -456,8 +473,8 @@ impl Segment {
// this vector will hold aggregates for this group key, which // this vector will hold aggregates for this group key, which
// will be updated as the rows in the aggregate columns are // will be updated as the rows in the aggregate columns are
// iterated. // iterated.
let mut group_key_aggs = Vec::with_capacity(aggregates.len()); let mut group_key_aggs = Vec::with_capacity(agg_cols_num);
for (_, agg_type) in aggregates { for (_, agg_type) in &dst.aggregate_columns {
group_key_aggs.push(AggregateResult::from(agg_type)); group_key_aggs.push(AggregateResult::from(agg_type));
} }
@ -478,11 +495,12 @@ impl Segment {
// Finally, build results set. Each encoded group key needs to be // Finally, build results set. Each encoded group key needs to be
// materialised into a logical group key // materialised into a logical group key
let columns = group_columns let columns = dst
.group_columns
.iter() .iter()
.map(|name| self.column_by_name(name)) .map(|name| self.column_by_name(name))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut group_key_vec = Vec::with_capacity(groups.len()); let mut group_key_vec: Vec<GroupKey<'_>> = Vec::with_capacity(groups.len());
let mut aggregate_vec = Vec::with_capacity(groups.len()); let mut aggregate_vec = Vec::with_capacity(groups.len());
for (group_key, aggs) in groups.into_iter() { for (group_key, aggs) in groups.into_iter() {
@ -492,16 +510,13 @@ impl Segment {
logical_key.push(columns[col_idx].decode_id(encoded_id as u32)); logical_key.push(columns[col_idx].decode_id(encoded_id as u32));
} }
group_key_vec.push(logical_key); group_key_vec.push(GroupKey(logical_key));
aggregate_vec.push(aggs.clone()); aggregate_vec.push(aggs.clone());
} }
ReadGroupResult { // update results
group_columns, dst.group_keys = group_key_vec;
aggregate_columns: aggregates, dst.aggregates = aggregate_vec;
group_keys: group_key_vec,
aggregates: aggregate_vec,
}
} }
// Optimised read group method when there are no predicates and all the group // Optimised read group method when there are no predicates and all the group
@ -509,35 +524,24 @@ impl Segment {
// //
// In this case all the grouping columns pre-computed bitsets for each // In this case all the grouping columns pre-computed bitsets for each
// distinct value. // distinct value.
fn read_group_all_rows_all_rle( fn read_group_all_rows_all_rle<'a>(&'a self, dst: &mut ReadGroupResult<'a>) {
&self, let group_columns = dst
group_names: &[ColumnName<'_>], .group_columns
aggregate_names_and_types: &[(ColumnName<'_>, AggregateType)],
) -> ReadGroupResult<'_> {
let (group_column_names, group_columns): (Vec<_>, Vec<_>) = group_names
.iter() .iter()
.map(|name| self.column_name_and_column(name)) .map(|name| self.column_by_name(name))
.unzip();
let (aggregate_column_names_and_types, aggregate_columns): (Vec<_>, Vec<_>) =
aggregate_names_and_types
.iter()
.map(|(name, typ)| {
let (column_name, col) = self.column_name_and_column(name);
((column_name, *typ), (col, *typ))
})
.unzip();
let encoded_groups = group_columns
.iter()
.map(|col| col.grouped_row_ids().unwrap_left())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut result = ReadGroupResult { let aggregate_columns_typ = dst
group_columns: group_column_names, .aggregate_columns
aggregate_columns: aggregate_column_names_and_types, .iter()
..ReadGroupResult::default() .map(|(name, typ)| (self.column_by_name(name), *typ))
}; .collect::<Vec<_>>();
let encoded_groups = dst
.group_columns
.iter()
.map(|name| self.column_by_name(name).grouped_row_ids().unwrap_left())
.collect::<Vec<_>>();
// multi_cartesian_product will create the cartesian product of all // multi_cartesian_product will create the cartesian product of all
// grouping-column values. This is likely going to be more group keys // grouping-column values. This is likely going to be more group keys
@ -605,10 +609,10 @@ impl Segment {
for (col_idx, &encoded_id) in group_key.iter().enumerate() { for (col_idx, &encoded_id) in group_key.iter().enumerate() {
material_key.push(group_columns[col_idx].decode_id(encoded_id as u32)); material_key.push(group_columns[col_idx].decode_id(encoded_id as u32));
} }
result.group_keys.push(material_key); dst.group_keys.push(GroupKey(material_key));
let mut aggregates = Vec::with_capacity(aggregate_columns.len()); let mut aggregates = Vec::with_capacity(aggregate_columns_typ.len());
for (agg_col, typ) in &aggregate_columns { for (agg_col, typ) in &aggregate_columns_typ {
aggregates.push(match typ { aggregates.push(match typ {
AggregateType::Count => { AggregateType::Count => {
AggregateResult::Count(agg_col.count(&aggregate_row_ids.to_vec()) as u64) AggregateResult::Count(agg_col.count(&aggregate_row_ids.to_vec()) as u64)
@ -626,10 +630,8 @@ impl Segment {
} }
}); });
} }
result.aggregates.push(aggregates); dst.aggregates.push(aggregates);
} }
result
} }
// Optimised read group method where only a single column is being used as // Optimised read group method where only a single column is being used as
@ -639,9 +641,8 @@ impl Segment {
fn read_group_single_group_column( fn read_group_single_group_column(
&self, &self,
predicates: &[Predicate<'_>], predicates: &[Predicate<'_>],
group_column: ColumnName<'_>, dst: &mut ReadGroupResult<'_>,
aggregates: &[(ColumnName<'_>, AggregateType)], ) {
) -> ReadGroupResult<'_> {
todo!() todo!()
} }
@ -655,7 +656,7 @@ impl Segment {
predicates: &[Predicate<'_>], predicates: &[Predicate<'_>],
group_column: ColumnName<'_>, group_column: ColumnName<'_>,
aggregates: &[(ColumnName<'_>, AggregateType)], aggregates: &[(ColumnName<'_>, AggregateType)],
) -> ReadGroupResult<'_> { ) {
todo!() todo!()
} }
} }
@ -664,7 +665,49 @@ pub type Predicate<'a> = (ColumnName<'a>, (Operator, Value<'a>));
// A GroupKey is an ordered collection of row values. The order determines which // A GroupKey is an ordered collection of row values. The order determines which
// columns the values originated from. // columns the values originated from.
pub type GroupKey<'a> = Vec<Value<'a>>; #[derive(PartialEq, PartialOrd, Clone)]
pub struct GroupKey<'segment>(Vec<Value<'segment>>);
impl Eq for GroupKey<'_> {}
// Implementing the `Ord` trait on `GroupKey` means that collections of group
// keys become sortable. This is typically useful for test because depending
// on the implementation, group keys are not always emitted in sorted order.
//
// To be compared, group keys *must* have the same length, or `cmp` will panic.
// They will be ordered as follows:
//
// [foo, zoo, zoo],
// [foo, bar, zoo],
// [bar, bar, bar],
// [bar, bar, zoo],
//
// becomes:
//
// [bar, bar, bar],
// [bar, bar, zoo],
// [foo, bar, zoo],
// [foo, zoo, zoo],
//
// Be careful sorting group keys in result sets, because other columns
// associated with the group keys won't be sorted unless the correct `sort` methods
// are used on the result set implementations.
impl Ord for GroupKey<'_> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// two group keys must have same length
assert_eq!(self.0.len(), other.0.len());
let cols = self.0.len();
for i in 0..cols {
match self.0[i].partial_cmp(&other.0[i]) {
Some(ord) => return ord,
None => continue,
}
}
std::cmp::Ordering::Equal
}
}
// A representation of a column name. // A representation of a column name.
pub type ColumnName<'a> = &'a str; pub type ColumnName<'a> = &'a str;
@ -857,6 +900,18 @@ impl ReadGroupResult<'_> {
pub fn cardinality(&self) -> usize { pub fn cardinality(&self) -> usize {
self.group_keys.len() self.group_keys.len()
} }
/// Executes a mutable sort of the rows in the result set based on the
/// lexicographic order each group key column. This is useful for testing
/// because it allows you to compare `read_group` results.
pub fn sort(&mut self) {
// The permutation crate lets you execute a sort on anything implements
// `Ord` and return the sort order, which can then be applied to other
// columns.
let perm = permutation::sort(self.group_keys.as_slice());
self.group_keys = perm.apply_slice(self.group_keys.as_slice());
self.aggregates = perm.apply_slice(self.aggregates.as_slice());
}
} }
impl std::fmt::Debug for &ReadGroupResult<'_> { impl std::fmt::Debug for &ReadGroupResult<'_> {
@ -894,7 +949,7 @@ impl std::fmt::Display for &ReadGroupResult<'_> {
} }
// write row for group by columns // write row for group by columns
for value in &self.group_keys[row] { for value in self.group_keys[row].0.iter() {
write!(f, "{},", value)?; write!(f, "{},", value)?;
} }
@ -1184,7 +1239,8 @@ west,prod,GET
]; ];
for (predicate, group_cols, aggs, expected) in cases { for (predicate, group_cols, aggs, expected) in cases {
let results = segment.read_group(&predicate, &group_cols, &aggs); let mut results = segment.read_group(&predicate, &group_cols, &aggs);
results.sort();
assert_eq!(format!("{:?}", &results), expected); assert_eq!(format!("{:?}", &results), expected);
} }
} }
@ -1357,11 +1413,11 @@ west,POST,304,101,203
group_columns, group_columns,
aggregate_columns, aggregate_columns,
group_keys: vec![ group_keys: vec![
vec![Value::String("east"), Value::String("host-a")], GroupKey(vec![Value::String("east"), Value::String("host-a")]),
vec![Value::String("east"), Value::String("host-b")], GroupKey(vec![Value::String("east"), Value::String("host-b")]),
vec![Value::String("west"), Value::String("host-a")], GroupKey(vec![Value::String("west"), Value::String("host-a")]),
vec![Value::String("west"), Value::String("host-c")], GroupKey(vec![Value::String("west"), Value::String("host-c")]),
vec![Value::String("west"), Value::String("host-d")], GroupKey(vec![Value::String("west"), Value::String("host-d")]),
], ],
aggregates: vec![ aggregates: vec![
vec![ vec![