From ab866073e30f7a9ecf046f7c06b336fb61576b48 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 26 Aug 2020 22:44:45 +0100 Subject: [PATCH] perf: faster group by with sorted cols --- delorean_mem_qe/src/bin/main.rs | 6 +- delorean_mem_qe/src/segment.rs | 351 +++++++++++++++++++++++++++++++- 2 files changed, 349 insertions(+), 8 deletions(-) diff --git a/delorean_mem_qe/src/bin/main.rs b/delorean_mem_qe/src/bin/main.rs index a58ce879d2..f89d7af645 100644 --- a/delorean_mem_qe/src/bin/main.rs +++ b/delorean_mem_qe/src/bin/main.rs @@ -509,10 +509,10 @@ fn time_group_by_multi_agg_count(store: &Store) { // fn time_group_by_multi_agg_sorted_count(store: &Store) { let strats = vec![ - // GroupingStrategy::HashGroup, - // GroupingStrategy::HashGroupConcurrent, + GroupingStrategy::HashGroup, + GroupingStrategy::HashGroupConcurrent, GroupingStrategy::SortGroup, - // GroupingStrategy::SortGroupConcurrent, + GroupingStrategy::SortGroupConcurrent, ]; for strat in &strats { diff --git a/delorean_mem_qe/src/segment.rs b/delorean_mem_qe/src/segment.rs index f3c6288df6..2e03f9d1d0 100644 --- a/delorean_mem_qe/src/segment.rs +++ b/delorean_mem_qe/src/segment.rs @@ -410,13 +410,37 @@ impl Segment { log::debug!("{:?}", hash_table); BTreeMap::new() } - pub fn aggregate_by_group_with_sort( &self, time_range: (i64, i64), predicates: &[(&str, Option<&column::Scalar>)], group_columns: &[String], aggregates: &[(String, AggregateType)], + ) -> BTreeMap, Vec<(String, column::Aggregate)>> { + if self.group_key_sorted(group_columns) { + log::info!("group key is already sorted {:?}", group_columns); + self.aggregate_by_group_with_sort_sorted( + time_range, + predicates, + group_columns, + aggregates, + ) + } else { + self.aggregate_by_group_with_sort_unsorted( + time_range, + predicates, + group_columns, + aggregates, + ) + } + } + + fn aggregate_by_group_with_sort_unsorted( + &self, + time_range: (i64, i64), + predicates: &[(&str, Option<&column::Scalar>)], + group_columns: &[String], + aggregates: &[(String, AggregateType)], ) -> BTreeMap, Vec<(String, column::Aggregate)>> { // filter on predicates and time let filtered_row_ids: croaring::Bitmap; @@ -490,7 +514,7 @@ impl Segment { let now = std::time::Instant::now(); if self.group_key_sorted(group_columns) { - log::debug!("segment already sorted by group key {:?}", group_columns); + panic!("This shouldn't be called!!!"); } else { // now sort on the first grouping columns. Right now the order doesn't matter... let group_col_sort_order = &(0..group_columns.len()).collect::>(); @@ -595,6 +619,281 @@ impl Segment { BTreeMap::new() } + // this method assumes that the segment's columns are sorted such that a + // sort of columns is not required. + fn aggregate_by_group_with_sort_sorted( + &self, + time_range: (i64, i64), + predicates: &[(&str, Option<&column::Scalar>)], + group_columns: &[String], + aggregates: &[(String, AggregateType)], + ) -> BTreeMap, Vec<(String, column::Aggregate)>> { + // filter on predicates and time + let filtered_row_ids: croaring::Bitmap; + if let Some(row_ids) = self.filter_by_predicates_eq(time_range, predicates) { + filtered_row_ids = row_ids; + } else { + return BTreeMap::new(); + } + let total_rows = &filtered_row_ids.cardinality(); + + let filtered_row_ids_vec = filtered_row_ids + .to_vec() + .iter() + .map(|v| *v as usize) + .collect::>(); + + // materialise all encoded values for the matching rows in the columns + // we are grouping on and store each group as an iterator. + let mut group_column_encoded_values = Vec::with_capacity(group_columns.len()); + for group_column in group_columns { + if let Some(column) = self.column(&group_column) { + let encoded_values = column.encoded_values(&filtered_row_ids_vec); + assert_eq!( + filtered_row_ids.cardinality() as usize, + encoded_values.len() + ); + + group_column_encoded_values.push(encoded_values); + } else { + panic!("need to handle no results for filtering/grouping..."); + } + } + + let mut new_agg_cols = Vec::with_capacity(aggregates.len()); + for (column_name, agg_type) in aggregates { + new_agg_cols.push((column_name, agg_type, self.column(&column_name))); + } + + let mut group_itrs = group_column_encoded_values + .iter() + .map(|vector| { + if let column::Vector::Integer(v) = vector { + v.iter() + } else { + panic!("don't support grouping on non-encoded values"); + } + }) + .collect::>(); + + // this tracks the last seen group key row. When it changes we can emit + // the grouped aggregates. + let mut last_group_row = group_itrs + .iter_mut() + .map(|itr| itr.next().unwrap()) + .collect::>(); + + let mut curr_group_row = last_group_row.clone(); + + let mut results = BTreeMap::new(); + let mut processed_rows = 1; + + let mut group_key_start_row_id = 0; + let mut group_size = 0; + + while processed_rows < *total_rows { + // update next group key. + let mut group_key_changed = false; + for (curr_v, itr) in curr_group_row.iter_mut().zip(group_itrs.iter_mut()) { + let next_v = itr.next().unwrap(); + if curr_v != &next_v { + group_key_changed = true; + } + *curr_v = next_v; + } + + // group key changed - emit group row and aggregates. + if group_key_changed { + let mut group_key_aggregates = Vec::with_capacity(aggregates.len()); + for (name, agg_type, col) in &new_agg_cols { + if let Some(c) = col { + let agg_result = c.aggregate_by_id_range( + agg_type, + group_key_start_row_id, + group_key_start_row_id + group_size, + ); + group_key_aggregates.push((name, agg_result)); + } else { + panic!("figure this out"); + } + } + + let key = last_group_row.clone(); + results.insert(key, group_key_aggregates); + + // update group key + last_group_row = curr_group_row.clone(); + + // reset counters tracking group key row range + group_key_start_row_id = processed_rows as usize; // TODO(edd) - could be an off-by-one? + group_size = 0; + } + + group_size += 1; + processed_rows += 1; + } + + // Emit final row + let mut group_key_aggregates = Vec::with_capacity(aggregates.len()); + for (name, agg_type, col) in &new_agg_cols { + if let Some(c) = col { + let agg_result = c.aggregate_by_id_range( + agg_type, + group_key_start_row_id, + group_key_start_row_id + group_size, + ); + group_key_aggregates.push((name, agg_result)); + } else { + panic!("figure this out"); + } + } + + let key = last_group_row.clone(); + results.insert(key, group_key_aggregates); + + log::info!("({:?} rows processed) {:?}", processed_rows, results); + // results + BTreeMap::new() + } + + // this method assumes that the segment's columns are sorted such that a + // sort of columns is not required. + fn window_aggregate_with_sort_sorted( + &self, + time_range: (i64, i64), + predicates: &[(&str, Option<&column::Scalar>)], + group_columns: &[String], + aggregates: &[(String, AggregateType)], + window: i64, + ) -> BTreeMap, Vec<(String, column::Aggregate)>> { + // filter on predicates and time + let filtered_row_ids: croaring::Bitmap; + if let Some(row_ids) = self.filter_by_predicates_eq(time_range, predicates) { + filtered_row_ids = row_ids; + } else { + return BTreeMap::new(); + } + let total_rows = &filtered_row_ids.cardinality(); + + let filtered_row_ids_vec = filtered_row_ids + .to_vec() + .iter() + .map(|v| *v as usize) + .collect::>(); + + // materialise all encoded values for the matching rows in the columns + // we are grouping on and store each group as an iterator. + let mut group_column_encoded_values = Vec::with_capacity(group_columns.len()); + for group_column in group_columns { + if let Some(column) = self.column(&group_column) { + let encoded_values = column.encoded_values(&filtered_row_ids_vec); + assert_eq!( + filtered_row_ids.cardinality() as usize, + encoded_values.len() + ); + + group_column_encoded_values.push(encoded_values); + } else { + panic!("need to handle no results for filtering/grouping..."); + } + } + + let mut new_agg_cols = Vec::with_capacity(aggregates.len()); + for (column_name, agg_type) in aggregates { + new_agg_cols.push((column_name, agg_type, self.column(&column_name))); + } + + let mut group_itrs = group_column_encoded_values + .iter() + .map(|vector| { + if let column::Vector::Integer(v) = vector { + v.iter() + } else { + panic!("don't support grouping on non-encoded values"); + } + }) + .collect::>(); + + // this tracks the last seen group key row. When it changes we can emit + // the grouped aggregates. + let mut last_group_row = group_itrs + .iter_mut() + .map(|itr| itr.next().unwrap()) + .collect::>(); + + let mut curr_group_row = last_group_row.clone(); + + let mut results = BTreeMap::new(); + let mut processed_rows = 1; + + let mut group_key_start_row_id = 0; + let mut group_size = 0; + + while processed_rows < *total_rows { + // update next group key. + let mut group_key_changed = false; + for (curr_v, itr) in curr_group_row.iter_mut().zip(group_itrs.iter_mut()) { + let next_v = itr.next().unwrap(); + if curr_v != &next_v { + group_key_changed = true; + } + *curr_v = next_v; + } + + // group key changed - emit group row and aggregates. + if group_key_changed { + let mut group_key_aggregates = Vec::with_capacity(aggregates.len()); + for (name, agg_type, col) in &new_agg_cols { + if let Some(c) = col { + let agg_result = c.aggregate_by_id_range( + agg_type, + group_key_start_row_id, + group_key_start_row_id + group_size, + ); + group_key_aggregates.push((name, agg_result)); + } else { + panic!("figure this out"); + } + } + + let key = last_group_row.clone(); + results.insert(key, group_key_aggregates); + + // update group key + last_group_row = curr_group_row.clone(); + + // reset counters tracking group key row range + group_key_start_row_id = processed_rows as usize; // TODO(edd) - could be an off-by-one? + group_size = 0; + } + + group_size += 1; + processed_rows += 1; + } + + // Emit final row + let mut group_key_aggregates = Vec::with_capacity(aggregates.len()); + for (name, agg_type, col) in &new_agg_cols { + if let Some(c) = col { + let agg_result = c.aggregate_by_id_range( + agg_type, + group_key_start_row_id, + group_key_start_row_id + group_size, + ); + group_key_aggregates.push((name, agg_result)); + } else { + panic!("figure this out"); + } + } + + let key = last_group_row.clone(); + results.insert(key, group_key_aggregates); + + log::info!("({:?} rows processed) {:?}", processed_rows, results); + // results + BTreeMap::new() + } + pub fn sum_column(&self, name: &str, row_ids: &mut croaring::Bitmap) -> Option { if let Some(c) = self.column(name) { return c.sum_by_ids(row_ids); @@ -1058,7 +1357,7 @@ impl<'a> Segments<'a> { &aggregates, ); log::info!( - "processed segment {:?} using multi-threaded hash-grouping in {:?}", + "processed segment {:?} using multi-threaded sort in {:?}", segment.time_range(), now.elapsed() ) @@ -1078,7 +1377,7 @@ impl<'a> Segments<'a> { &aggregates_arc.clone(), ); log::info!( - "processed segment {:?} using multi-threaded hash-grouping in {:?}", + "processed segment {:?} using multi-threaded sort in {:?}", segment.time_range(), now.elapsed() ) @@ -1099,7 +1398,7 @@ impl<'a> Segments<'a> { &aggregates, ); log::info!( - "processed segment {:?} using single-threaded hash-grouping in {:?}", + "processed segment {:?} using single-threaded sort in {:?}", segment.time_range(), now.elapsed() ) @@ -1129,6 +1428,48 @@ impl<'a> Segments<'a> { min_min } + pub fn window_agg_eq( + &self, + time_range: (i64, i64), + predicates: &[(&str, Option<&column::Scalar>)], + group_columns: Vec, + aggregates: Vec<(String, AggregateType)>, + strategy: &GroupingStrategy, + window: i64, + ) -> BTreeMap, Vec<((String, Aggregate), column::Aggregate)>> { + let (min, max) = time_range; + if max <= min { + panic!("max <= min"); + } + + match strategy { + GroupingStrategy::HashGroup => { + panic!("not yet"); + } + GroupingStrategy::HashGroupConcurrent => { + panic!("not yet"); + } + GroupingStrategy::SortGroup => { + return self.read_group_eq_sort( + time_range, + predicates, + group_columns, + aggregates, + false, + ) + } + GroupingStrategy::SortGroupConcurrent => { + panic!("not yet"); + } + } + + // TODO(edd): merge results - not expensive really... + // let mut cum_results: BTreeMap, Vec<((String, Aggregate), column::Aggregate)>> = + // BTreeMap::new(); + + // cum_results + } + /// Returns the maximum value for a column in a set of segments. pub fn column_max(&self, column_name: &str) -> Option { if self.segments.is_empty() {