diff --git a/Cargo.lock b/Cargo.lock index fc76dbc3c5..74118f4c09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2129,6 +2129,12 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "permutation" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9978962f8a4b158e97447a6d09d2d75e206d2994eff056c894019f362b27142" + [[package]] name = "petgraph" version = "0.5.1" @@ -2789,6 +2795,7 @@ dependencies = [ "either", "itertools 0.9.0", "packers", + "permutation", "rand", "rand_distr", ] diff --git a/segment_store/Cargo.toml b/segment_store/Cargo.toml index 87be7f8867..6ecdf3e796 100644 --- a/segment_store/Cargo.toml +++ b/segment_store/Cargo.toml @@ -14,6 +14,7 @@ packers = { path = "../packers" } croaring = "0.4.5" itertools = "0.9.0" either = "1.6.1" +permutation = "0.2.5" [dev-dependencies] criterion = "0.3.3" diff --git a/segment_store/src/segment.rs b/segment_store/src/segment.rs index 24055eb299..9addf36f69 100644 --- a/segment_store/src/segment.rs +++ b/segment_store/src/segment.rs @@ -335,81 +335,98 @@ impl Segment { ) } - /// Returns a set of group keys and aggregated column data associated with - /// them. - /// - /// Right now, predicates are conjunctive (AND). + /// Right now, predicates are treated conjunctive (AND) predicates. `read_group` + /// does not guarantee any sort order. Ordering of results should be handled + /// high up in the `Table` section of the segment store, where multiple + /// segment results may need to be merged. pub fn read_group( &self, predicates: &[Predicate<'_>], group_columns: &[ColumnName<'_>], aggregates: &[(ColumnName<'_>, AggregateType)], ) -> ReadGroupResult<'_> { + // `ReadGroupResult`s should have the same lifetime as self. Alternatively + // ReadGroupResult could not store references to input data and put the + // responsibility on the caller to tie result data and input data + // together, but the convenience seems useful for now. + let mut result = ReadGroupResult { + group_columns: group_columns + .iter() + .map(|name| { + let (column_name, col) = self.column_name_and_column(name); + column_name + }) + .collect::>(), + aggregate_columns: aggregates + .iter() + .map(|(name, typ)| { + let (column_name, col) = self.column_name_and_column(name); + (column_name, *typ) + }) + .collect::>(), + ..ReadGroupResult::default() + }; + // Handle case where there are no predicates and all the columns being // grouped support constant-time expression of the row_ids belonging to // each grouped value. - let all_group_cols_pre_computed = group_columns.iter().all(|col_name| { - self.column_by_name(col_name) + let all_group_cols_pre_computed = result.group_columns.iter().all(|name| { + self.column_by_name(name) .properties() .has_pre_computed_row_ids }); if predicates.is_empty() && all_group_cols_pre_computed { - return self.read_group_all_rows_all_rle(group_columns, aggregates); + self.read_group_all_rows_all_rle(&mut result); + return result; } // If there is a single group column then we can use an optimised // approach for building group keys if group_columns.len() == 1 { - return self.read_group_single_group_column(predicates, group_columns[0], aggregates); + self.read_group_single_group_column(predicates, &mut result); + return result; } // Perform the group by using a hashmap - self.read_group_hash(predicates, group_columns, aggregates) + self.read_group_hash(predicates, &mut result); + result } - fn read_group_hash( - &self, - predicates: &[Predicate<'_>], - group_columns: &[ColumnName<'_>], - aggregates: &[(ColumnName<'_>, AggregateType)], - ) -> ReadGroupResult<'_> { + fn read_group_hash<'a>(&'a self, predicates: &[Predicate<'_>], dst: &mut ReadGroupResult<'a>) { let row_ids = self.row_ids_from_predicates(predicates); let filter_row_ids = match row_ids { - RowIDsOption::None(_) => { - return ReadGroupResult { - group_columns, - aggregate_columns: aggregates, - ..ReadGroupResult::default() - } - } // no matching rows + RowIDsOption::None(_) => return, // no matching rows RowIDsOption::Some(row_ids) => Some(row_ids.to_vec()), RowIDsOption::All(row_ids) => None, }; + let group_cols_num = dst.group_columns.len(); + let agg_cols_num = dst.aggregate_columns.len(); + // materialise all *encoded* values for each column we are grouping on. // These will not be the logical (typically string) values, but will be // vectors of integers representing the physical values. - let mut groupby_encoded_ids = Vec::with_capacity(group_columns.len()); - for name in group_columns { + let mut groupby_encoded_ids = Vec::with_capacity(group_cols_num); + for name in &dst.group_columns { let col = self.column_by_name(name); - let mut dst_buf = EncodedValues::with_capacity_u32(col.num_rows() as usize); + let mut encoded_values_buf = EncodedValues::with_capacity_u32(col.num_rows() as usize); // do we want some rows for the column or all of them? match &filter_row_ids { Some(row_ids) => { - dst_buf = col.encoded_values(row_ids, dst_buf); + encoded_values_buf = col.encoded_values(row_ids, encoded_values_buf); } None => { // None implies "no partial set of row ids" meaning get all of them. - dst_buf = col.all_encoded_values(dst_buf); + encoded_values_buf = col.all_encoded_values(encoded_values_buf); } } - groupby_encoded_ids.push(dst_buf); + groupby_encoded_ids.push(encoded_values_buf); } // Materialise decoded values in aggregate columns. - let mut aggregate_columns_data = Vec::with_capacity(aggregates.len()); - for (name, agg_type) in aggregates { + let mut aggregate_columns_data = Vec::with_capacity(agg_cols_num); + for (name, agg_type) in &dst.aggregate_columns { let col = self.column_by_name(name); // TODO(edd): this materialises a column per aggregate. If there are @@ -432,7 +449,7 @@ impl Segment { // key_buf will be used as a temporary buffer for group keys, which are // themselves integers. - let mut key_buf = Vec::with_capacity(group_columns.len()); + let mut key_buf = Vec::with_capacity(group_cols_num); key_buf.resize(key_buf.capacity(), 0); for row in 0..groupby_encoded_ids[0].len() { @@ -456,8 +473,8 @@ impl Segment { // this vector will hold aggregates for this group key, which // will be updated as the rows in the aggregate columns are // iterated. - let mut group_key_aggs = Vec::with_capacity(aggregates.len()); - for (_, agg_type) in aggregates { + let mut group_key_aggs = Vec::with_capacity(agg_cols_num); + for (_, agg_type) in &dst.aggregate_columns { group_key_aggs.push(AggregateResult::from(agg_type)); } @@ -478,11 +495,12 @@ impl Segment { // Finally, build results set. Each encoded group key needs to be // materialised into a logical group key - let columns = group_columns + let columns = dst + .group_columns .iter() .map(|name| self.column_by_name(name)) .collect::>(); - let mut group_key_vec = Vec::with_capacity(groups.len()); + let mut group_key_vec: Vec> = Vec::with_capacity(groups.len()); let mut aggregate_vec = Vec::with_capacity(groups.len()); for (group_key, aggs) in groups.into_iter() { @@ -492,16 +510,13 @@ impl Segment { logical_key.push(columns[col_idx].decode_id(encoded_id as u32)); } - group_key_vec.push(logical_key); + group_key_vec.push(GroupKey(logical_key)); aggregate_vec.push(aggs.clone()); } - ReadGroupResult { - group_columns, - aggregate_columns: aggregates, - group_keys: group_key_vec, - aggregates: aggregate_vec, - } + // update results + dst.group_keys = group_key_vec; + dst.aggregates = aggregate_vec; } // Optimised read group method when there are no predicates and all the group @@ -509,35 +524,24 @@ impl Segment { // // In this case all the grouping columns pre-computed bitsets for each // distinct value. - fn read_group_all_rows_all_rle( - &self, - group_names: &[ColumnName<'_>], - aggregate_names_and_types: &[(ColumnName<'_>, AggregateType)], - ) -> ReadGroupResult<'_> { - let (group_column_names, group_columns): (Vec<_>, Vec<_>) = group_names + fn read_group_all_rows_all_rle<'a>(&'a self, dst: &mut ReadGroupResult<'a>) { + let group_columns = dst + .group_columns .iter() - .map(|name| self.column_name_and_column(name)) - .unzip(); - - let (aggregate_column_names_and_types, aggregate_columns): (Vec<_>, Vec<_>) = - aggregate_names_and_types - .iter() - .map(|(name, typ)| { - let (column_name, col) = self.column_name_and_column(name); - ((column_name, *typ), (col, *typ)) - }) - .unzip(); - - let encoded_groups = group_columns - .iter() - .map(|col| col.grouped_row_ids().unwrap_left()) + .map(|name| self.column_by_name(name)) .collect::>(); - let mut result = ReadGroupResult { - group_columns: group_column_names, - aggregate_columns: aggregate_column_names_and_types, - ..ReadGroupResult::default() - }; + let aggregate_columns_typ = dst + .aggregate_columns + .iter() + .map(|(name, typ)| (self.column_by_name(name), *typ)) + .collect::>(); + + let encoded_groups = dst + .group_columns + .iter() + .map(|name| self.column_by_name(name).grouped_row_ids().unwrap_left()) + .collect::>(); // multi_cartesian_product will create the cartesian product of all // grouping-column values. This is likely going to be more group keys @@ -605,10 +609,10 @@ impl Segment { for (col_idx, &encoded_id) in group_key.iter().enumerate() { material_key.push(group_columns[col_idx].decode_id(encoded_id as u32)); } - result.group_keys.push(material_key); + dst.group_keys.push(GroupKey(material_key)); - let mut aggregates = Vec::with_capacity(aggregate_columns.len()); - for (agg_col, typ) in &aggregate_columns { + let mut aggregates = Vec::with_capacity(aggregate_columns_typ.len()); + for (agg_col, typ) in &aggregate_columns_typ { aggregates.push(match typ { AggregateType::Count => { AggregateResult::Count(agg_col.count(&aggregate_row_ids.to_vec()) as u64) @@ -626,10 +630,8 @@ impl Segment { } }); } - result.aggregates.push(aggregates); + dst.aggregates.push(aggregates); } - - result } // Optimised read group method where only a single column is being used as @@ -639,9 +641,8 @@ impl Segment { fn read_group_single_group_column( &self, predicates: &[Predicate<'_>], - group_column: ColumnName<'_>, - aggregates: &[(ColumnName<'_>, AggregateType)], - ) -> ReadGroupResult<'_> { + dst: &mut ReadGroupResult<'_>, + ) { todo!() } @@ -655,7 +656,7 @@ impl Segment { predicates: &[Predicate<'_>], group_column: ColumnName<'_>, aggregates: &[(ColumnName<'_>, AggregateType)], - ) -> ReadGroupResult<'_> { + ) { todo!() } } @@ -664,7 +665,49 @@ pub type Predicate<'a> = (ColumnName<'a>, (Operator, Value<'a>)); // A GroupKey is an ordered collection of row values. The order determines which // columns the values originated from. -pub type GroupKey<'a> = Vec>; +#[derive(PartialEq, PartialOrd, Clone)] +pub struct GroupKey<'segment>(Vec>); + +impl Eq for GroupKey<'_> {} + +// Implementing the `Ord` trait on `GroupKey` means that collections of group +// keys become sortable. This is typically useful for test because depending +// on the implementation, group keys are not always emitted in sorted order. +// +// To be compared, group keys *must* have the same length, or `cmp` will panic. +// They will be ordered as follows: +// +// [foo, zoo, zoo], +// [foo, bar, zoo], +// [bar, bar, bar], +// [bar, bar, zoo], +// +// becomes: +// +// [bar, bar, bar], +// [bar, bar, zoo], +// [foo, bar, zoo], +// [foo, zoo, zoo], +// +// Be careful sorting group keys in result sets, because other columns +// associated with the group keys won't be sorted unless the correct `sort` methods +// are used on the result set implementations. +impl Ord for GroupKey<'_> { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // two group keys must have same length + assert_eq!(self.0.len(), other.0.len()); + + let cols = self.0.len(); + for i in 0..cols { + match self.0[i].partial_cmp(&other.0[i]) { + Some(ord) => return ord, + None => continue, + } + } + + std::cmp::Ordering::Equal + } +} // A representation of a column name. pub type ColumnName<'a> = &'a str; @@ -857,6 +900,18 @@ impl ReadGroupResult<'_> { pub fn cardinality(&self) -> usize { self.group_keys.len() } + + /// Executes a mutable sort of the rows in the result set based on the + /// lexicographic order each group key column. This is useful for testing + /// because it allows you to compare `read_group` results. + pub fn sort(&mut self) { + // The permutation crate lets you execute a sort on anything implements + // `Ord` and return the sort order, which can then be applied to other + // columns. + let perm = permutation::sort(self.group_keys.as_slice()); + self.group_keys = perm.apply_slice(self.group_keys.as_slice()); + self.aggregates = perm.apply_slice(self.aggregates.as_slice()); + } } impl std::fmt::Debug for &ReadGroupResult<'_> { @@ -894,7 +949,7 @@ impl std::fmt::Display for &ReadGroupResult<'_> { } // write row for group by columns - for value in &self.group_keys[row] { + for value in self.group_keys[row].0.iter() { write!(f, "{},", value)?; } @@ -1184,7 +1239,8 @@ west,prod,GET ]; for (predicate, group_cols, aggs, expected) in cases { - let results = segment.read_group(&predicate, &group_cols, &aggs); + let mut results = segment.read_group(&predicate, &group_cols, &aggs); + results.sort(); assert_eq!(format!("{:?}", &results), expected); } } @@ -1357,11 +1413,11 @@ west,POST,304,101,203 group_columns, aggregate_columns, group_keys: vec![ - vec![Value::String("east"), Value::String("host-a")], - vec![Value::String("east"), Value::String("host-b")], - vec![Value::String("west"), Value::String("host-a")], - vec![Value::String("west"), Value::String("host-c")], - vec![Value::String("west"), Value::String("host-d")], + GroupKey(vec![Value::String("east"), Value::String("host-a")]), + GroupKey(vec![Value::String("east"), Value::String("host-b")]), + GroupKey(vec![Value::String("west"), Value::String("host-a")]), + GroupKey(vec![Value::String("west"), Value::String("host-c")]), + GroupKey(vec![Value::String("west"), Value::String("host-d")]), ], aggregates: vec![ vec![