From ee46c194c85ce4d773107e39ffc864177f90279f Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 27 Aug 2020 14:57:27 +0100 Subject: [PATCH] refactor: integrate windowing into grouping --- delorean_mem_qe/src/bin/main.rs | 65 +++++++++------ delorean_mem_qe/src/segment.rs | 142 +++++++++++++++++++++----------- 2 files changed, 134 insertions(+), 73 deletions(-) diff --git a/delorean_mem_qe/src/bin/main.rs b/delorean_mem_qe/src/bin/main.rs index e8bfc3dbd7..e3c631642f 100644 --- a/delorean_mem_qe/src/bin/main.rs +++ b/delorean_mem_qe/src/bin/main.rs @@ -120,10 +120,10 @@ fn build_store( match rb { Err(e) => println!("WARNING: error reading batch: {:?}, SKIPPING", e), Ok(Some(rb)) => { - // if i < 363 { - // i += 1; - // continue; - // } + if i < 363 { + i += 1; + continue; + } let schema = Schema::with_sort_order( rb.schema(), sort_order.iter().map(|s| s.to_string()).collect(), @@ -485,6 +485,7 @@ fn time_group_by_multi_agg_count(store: &Store) { &[], vec!["status".to_string(), "method".to_string()], vec![("counter".to_string(), AggregateType::Count)], + 0, strat, ); @@ -529,6 +530,7 @@ fn time_group_by_multi_agg_sorted_count(store: &Store) { &[], vec!["env".to_string(), "role".to_string()], vec![("counter".to_string(), AggregateType::Count)], + 0, strat, ); @@ -547,29 +549,40 @@ fn time_group_by_multi_agg_sorted_count(store: &Store) { } fn time_window_agg_sorted_count(store: &Store) { - let repeat = 10; - let mut total_time: std::time::Duration = std::time::Duration::new(0, 0); - let mut total_max = 0; - let segments = store.segments(); - for _ in 0..repeat { - let now = std::time::Instant::now(); + let strats = vec![ + // GroupingStrategy::HashGroup, + // GroupingStrategy::HashGroupConcurrent, + GroupingStrategy::SortGroup, + // GroupingStrategy::SortGroupConcurrent, + ]; - let groups = segments.window_agg_eq( - (1589000000000001, 1590044410000000), - &[], - vec!["env".to_string(), "role".to_string()], - vec![("counter".to_string(), AggregateType::Count)], - 60000000 * 10, // 10 minutes + for strat in &strats { + let repeat = 10; + let mut total_time: std::time::Duration = std::time::Duration::new(0, 0); + let mut total_max = 0; + let segments = store.segments(); + for _ in 0..repeat { + let now = std::time::Instant::now(); + + let groups = segments.read_group_eq( + (1589000000000001, 1590044410000000), + &[], + vec!["env".to_string(), "role".to_string()], + vec![("counter".to_string(), AggregateType::Count)], + 60000000 * 10, // 10 minutes, + strat, + ); + + total_time += now.elapsed(); + total_max += groups.len(); + } + println!( + "time_window_agg_sorted_count {:?} ran {:?} in {:?} {:?} / call {:?}", + strat, + repeat, + total_time, + total_time / repeat, + total_max ); - - total_time += now.elapsed(); - total_max += groups.len(); } - println!( - "time_window_agg_sorted_count ran {:?} in {:?} {:?} / call {:?}", - repeat, - total_time, - total_time / repeat, - total_max - ); } diff --git a/delorean_mem_qe/src/segment.rs b/delorean_mem_qe/src/segment.rs index e99c7e0181..bac015537e 100644 --- a/delorean_mem_qe/src/segment.rs +++ b/delorean_mem_qe/src/segment.rs @@ -417,6 +417,7 @@ impl Segment { predicates: &[(&str, Option<&column::Scalar>)], group_columns: &[String], aggregates: &[(String, AggregateType)], + window: i64, ) -> BTreeMap, Vec<(String, column::Aggregate)>> { if self.group_key_sorted(group_columns) { log::info!("group key is already sorted {:?}", group_columns); @@ -425,13 +426,16 @@ impl Segment { predicates, group_columns, aggregates, + window, ) } else { + log::info!("group key needs sorting {:?}", group_columns); self.aggregate_by_group_with_sort_unsorted( time_range, predicates, group_columns, aggregates, + window, ) } } @@ -442,7 +446,17 @@ impl Segment { predicates: &[(&str, Option<&column::Scalar>)], group_columns: &[String], aggregates: &[(String, AggregateType)], + window: i64, ) -> BTreeMap, Vec<(String, column::Aggregate)>> { + log::debug!("aggregate_by_group_with_sort_unsorted called"); + + if window > 0 { + // last column on group key should be time. + assert_eq!(group_columns[group_columns.len() - 1], "time"); + } else { + assert_ne!(group_columns[group_columns.len() - 1], "time"); + } + // filter on predicates and time let filtered_row_ids: croaring::Bitmap; if let Some(row_ids) = self.filter_by_predicates_eq(time_range, predicates) { @@ -543,9 +557,17 @@ impl Segment { // this tracks the last seen group key row. When it changes we can emit // the grouped aggregates. + let group_itrs_len = &group_itrs.len(); let mut last_group_row = group_itrs .iter_mut() - .map(|itr| itr.next().unwrap()) + .enumerate() + .map(|(i, itr)| { + if i == group_itrs_len - 1 && window > 0 { + // time column - apply window function + return itr.next().unwrap() / window * window; + } + *itr.next().unwrap() + }) .collect::>(); let mut curr_group_row = last_group_row.clone(); @@ -575,8 +597,17 @@ impl Segment { while processed_rows < *total_rows { // update next group key. let mut group_key_changed = false; - for (curr_v, itr) in curr_group_row.iter_mut().zip(group_itrs.iter_mut()) { - let next_v = itr.next().unwrap(); + for (i, (curr_v, itr)) in curr_group_row + .iter_mut() + .zip(group_itrs.iter_mut()) + .enumerate() + { + let next_v = if i == group_itrs_len - 1 && window > 0 { + // time column - apply window function + itr.next().unwrap() / window * window + } else { + *itr.next().unwrap() + }; if curr_v != &next_v { group_key_changed = true; } @@ -615,7 +646,7 @@ impl Segment { // Emit final row results.insert(last_group_row, cum_aggregates); - log::debug!("{:?}", results); + log::info!("({:?} rows processed) {:?}", processed_rows, results); // results BTreeMap::new() } @@ -628,7 +659,17 @@ impl Segment { predicates: &[(&str, Option<&column::Scalar>)], group_columns: &[String], aggregates: &[(String, AggregateType)], + window: i64, ) -> BTreeMap, Vec<(String, column::Aggregate)>> { + log::debug!("aggregate_by_group_with_sort_sorted called"); + + if window > 0 { + // last column on group key should be time. + assert_eq!(group_columns[group_columns.len() - 1], "time"); + } else { + assert_ne!(group_columns[group_columns.len() - 1], "time"); + } + // filter on predicates and time let filtered_row_ids: croaring::Bitmap; if let Some(row_ids) = self.filter_by_predicates_eq(time_range, predicates) { @@ -679,9 +720,17 @@ impl Segment { // this tracks the last seen group key row. When it changes we can emit // the grouped aggregates. + let group_itrs_len = &group_itrs.len(); let mut last_group_row = group_itrs .iter_mut() - .map(|itr| itr.next().unwrap()) + .enumerate() + .map(|(i, itr)| { + if i == group_itrs_len - 1 && window > 0 { + // time column - apply window function + return itr.next().unwrap() / window * window; + } + *itr.next().unwrap() + }) .collect::>(); let mut curr_group_row = last_group_row.clone(); @@ -695,8 +744,17 @@ impl Segment { while processed_rows < *total_rows { // update next group key. let mut group_key_changed = false; - for (curr_v, itr) in curr_group_row.iter_mut().zip(group_itrs.iter_mut()) { - let next_v = itr.next().unwrap(); + for (i, (curr_v, itr)) in curr_group_row + .iter_mut() + .zip(group_itrs.iter_mut()) + .enumerate() + { + let next_v = if i == group_itrs_len - 1 && window > 0 { + // time column - apply window function + itr.next().unwrap() / window * window + } else { + *itr.next().unwrap() + }; if curr_v != &next_v { group_key_changed = true; } @@ -749,7 +807,7 @@ impl Segment { } } - let key = last_group_row.clone(); + let key = last_group_row; results.insert(key, group_key_aggregates); log::info!("({:?} rows processed) {:?}", processed_rows, results); @@ -1447,6 +1505,7 @@ impl<'a> Segments<'a> { predicates: &[(&str, Option<&column::Scalar>)], group_columns: Vec, aggregates: Vec<(String, AggregateType)>, + window: i64, strategy: &GroupingStrategy, ) -> BTreeMap, Vec<((String, Aggregate), column::Aggregate)>> { let (min, max) = time_range; @@ -1456,48 +1515,28 @@ impl<'a> Segments<'a> { match strategy { GroupingStrategy::HashGroup => { - return self.read_group_eq_hash( - time_range, - predicates, - group_columns, - aggregates, - false, - ) + self.read_group_eq_hash(time_range, predicates, group_columns, aggregates, false) } GroupingStrategy::HashGroupConcurrent => { - return self.read_group_eq_hash( - time_range, - predicates, - group_columns, - aggregates, - true, - ) - } - GroupingStrategy::SortGroup => { - return self.read_group_eq_sort( - time_range, - predicates, - group_columns, - aggregates, - false, - ) - } - GroupingStrategy::SortGroupConcurrent => { - return self.read_group_eq_sort( - time_range, - predicates, - group_columns, - aggregates, - true, - ) + self.read_group_eq_hash(time_range, predicates, group_columns, aggregates, true) } + GroupingStrategy::SortGroup => self.read_group_eq_sort( + time_range, + predicates, + group_columns, + aggregates, + window, + false, + ), + GroupingStrategy::SortGroupConcurrent => self.read_group_eq_sort( + time_range, + predicates, + group_columns, + aggregates, + window, + true, + ), } - - // TODO(edd): merge results - not expensive really... - // let mut cum_results: BTreeMap, Vec<((String, Aggregate), column::Aggregate)>> = - // BTreeMap::new(); - - // cum_results } fn read_group_eq_hash( @@ -1581,10 +1620,16 @@ impl<'a> Segments<'a> { &self, time_range: (i64, i64), predicates: &[(&str, Option<&column::Scalar>)], - group_columns: Vec, + mut group_columns: Vec, aggregates: Vec<(String, AggregateType)>, + window: i64, concurrent: bool, ) -> BTreeMap, Vec<((String, Aggregate), column::Aggregate)>> { + if window > 0 { + // add time column to the group key + group_columns.push("time".to_string()); + } + if concurrent { let group_columns_arc = std::sync::Arc::new(group_columns); let aggregates_arc = std::sync::Arc::new(aggregates); @@ -1602,6 +1647,7 @@ impl<'a> Segments<'a> { predicates, &group_columns, &aggregates, + window, ); log::info!( "processed segment {:?} using multi-threaded sort in {:?}", @@ -1622,6 +1668,7 @@ impl<'a> Segments<'a> { predicates, &group_columns_arc.clone(), &aggregates_arc.clone(), + window, ); log::info!( "processed segment {:?} using multi-threaded sort in {:?}", @@ -1643,6 +1690,7 @@ impl<'a> Segments<'a> { predicates, &group_columns, &aggregates, + window, ); log::info!( "processed segment {:?} using single-threaded sort in {:?}",