refactor: integrate windowing into grouping

pull/24376/head
Edd Robinson 2020-08-27 14:57:27 +01:00
parent 4a153f5f7d
commit ee46c194c8
2 changed files with 134 additions and 73 deletions

View File

@ -120,10 +120,10 @@ fn build_store(
match rb {
Err(e) => println!("WARNING: error reading batch: {:?}, SKIPPING", e),
Ok(Some(rb)) => {
// if i < 363 {
// i += 1;
// continue;
// }
if i < 363 {
i += 1;
continue;
}
let schema = Schema::with_sort_order(
rb.schema(),
sort_order.iter().map(|s| s.to_string()).collect(),
@ -485,6 +485,7 @@ fn time_group_by_multi_agg_count(store: &Store) {
&[],
vec!["status".to_string(), "method".to_string()],
vec![("counter".to_string(), AggregateType::Count)],
0,
strat,
);
@ -529,6 +530,7 @@ fn time_group_by_multi_agg_sorted_count(store: &Store) {
&[],
vec!["env".to_string(), "role".to_string()],
vec![("counter".to_string(), AggregateType::Count)],
0,
strat,
);
@ -547,29 +549,40 @@ fn time_group_by_multi_agg_sorted_count(store: &Store) {
}
fn time_window_agg_sorted_count(store: &Store) {
let repeat = 10;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let mut total_max = 0;
let segments = store.segments();
for _ in 0..repeat {
let now = std::time::Instant::now();
let strats = vec![
// GroupingStrategy::HashGroup,
// GroupingStrategy::HashGroupConcurrent,
GroupingStrategy::SortGroup,
// GroupingStrategy::SortGroupConcurrent,
];
let groups = segments.window_agg_eq(
(1589000000000001, 1590044410000000),
&[],
vec!["env".to_string(), "role".to_string()],
vec![("counter".to_string(), AggregateType::Count)],
60000000 * 10, // 10 minutes
for strat in &strats {
let repeat = 10;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let mut total_max = 0;
let segments = store.segments();
for _ in 0..repeat {
let now = std::time::Instant::now();
let groups = segments.read_group_eq(
(1589000000000001, 1590044410000000),
&[],
vec!["env".to_string(), "role".to_string()],
vec![("counter".to_string(), AggregateType::Count)],
60000000 * 10, // 10 minutes,
strat,
);
total_time += now.elapsed();
total_max += groups.len();
}
println!(
"time_window_agg_sorted_count {:?} ran {:?} in {:?} {:?} / call {:?}",
strat,
repeat,
total_time,
total_time / repeat,
total_max
);
total_time += now.elapsed();
total_max += groups.len();
}
println!(
"time_window_agg_sorted_count ran {:?} in {:?} {:?} / call {:?}",
repeat,
total_time,
total_time / repeat,
total_max
);
}

View File

@ -417,6 +417,7 @@ impl Segment {
predicates: &[(&str, Option<&column::Scalar>)],
group_columns: &[String],
aggregates: &[(String, AggregateType)],
window: i64,
) -> BTreeMap<Vec<&i64>, Vec<(String, column::Aggregate)>> {
if self.group_key_sorted(group_columns) {
log::info!("group key is already sorted {:?}", group_columns);
@ -425,13 +426,16 @@ impl Segment {
predicates,
group_columns,
aggregates,
window,
)
} else {
log::info!("group key needs sorting {:?}", group_columns);
self.aggregate_by_group_with_sort_unsorted(
time_range,
predicates,
group_columns,
aggregates,
window,
)
}
}
@ -442,7 +446,17 @@ impl Segment {
predicates: &[(&str, Option<&column::Scalar>)],
group_columns: &[String],
aggregates: &[(String, AggregateType)],
window: i64,
) -> BTreeMap<Vec<&i64>, Vec<(String, column::Aggregate)>> {
log::debug!("aggregate_by_group_with_sort_unsorted called");
if window > 0 {
// last column on group key should be time.
assert_eq!(group_columns[group_columns.len() - 1], "time");
} else {
assert_ne!(group_columns[group_columns.len() - 1], "time");
}
// filter on predicates and time
let filtered_row_ids: croaring::Bitmap;
if let Some(row_ids) = self.filter_by_predicates_eq(time_range, predicates) {
@ -543,9 +557,17 @@ impl Segment {
// this tracks the last seen group key row. When it changes we can emit
// the grouped aggregates.
let group_itrs_len = &group_itrs.len();
let mut last_group_row = group_itrs
.iter_mut()
.map(|itr| itr.next().unwrap())
.enumerate()
.map(|(i, itr)| {
if i == group_itrs_len - 1 && window > 0 {
// time column - apply window function
return itr.next().unwrap() / window * window;
}
*itr.next().unwrap()
})
.collect::<Vec<_>>();
let mut curr_group_row = last_group_row.clone();
@ -575,8 +597,17 @@ impl Segment {
while processed_rows < *total_rows {
// update next group key.
let mut group_key_changed = false;
for (curr_v, itr) in curr_group_row.iter_mut().zip(group_itrs.iter_mut()) {
let next_v = itr.next().unwrap();
for (i, (curr_v, itr)) in curr_group_row
.iter_mut()
.zip(group_itrs.iter_mut())
.enumerate()
{
let next_v = if i == group_itrs_len - 1 && window > 0 {
// time column - apply window function
itr.next().unwrap() / window * window
} else {
*itr.next().unwrap()
};
if curr_v != &next_v {
group_key_changed = true;
}
@ -615,7 +646,7 @@ impl Segment {
// Emit final row
results.insert(last_group_row, cum_aggregates);
log::debug!("{:?}", results);
log::info!("({:?} rows processed) {:?}", processed_rows, results);
// results
BTreeMap::new()
}
@ -628,7 +659,17 @@ impl Segment {
predicates: &[(&str, Option<&column::Scalar>)],
group_columns: &[String],
aggregates: &[(String, AggregateType)],
window: i64,
) -> BTreeMap<Vec<&i64>, Vec<(String, column::Aggregate)>> {
log::debug!("aggregate_by_group_with_sort_sorted called");
if window > 0 {
// last column on group key should be time.
assert_eq!(group_columns[group_columns.len() - 1], "time");
} else {
assert_ne!(group_columns[group_columns.len() - 1], "time");
}
// filter on predicates and time
let filtered_row_ids: croaring::Bitmap;
if let Some(row_ids) = self.filter_by_predicates_eq(time_range, predicates) {
@ -679,9 +720,17 @@ impl Segment {
// this tracks the last seen group key row. When it changes we can emit
// the grouped aggregates.
let group_itrs_len = &group_itrs.len();
let mut last_group_row = group_itrs
.iter_mut()
.map(|itr| itr.next().unwrap())
.enumerate()
.map(|(i, itr)| {
if i == group_itrs_len - 1 && window > 0 {
// time column - apply window function
return itr.next().unwrap() / window * window;
}
*itr.next().unwrap()
})
.collect::<Vec<_>>();
let mut curr_group_row = last_group_row.clone();
@ -695,8 +744,17 @@ impl Segment {
while processed_rows < *total_rows {
// update next group key.
let mut group_key_changed = false;
for (curr_v, itr) in curr_group_row.iter_mut().zip(group_itrs.iter_mut()) {
let next_v = itr.next().unwrap();
for (i, (curr_v, itr)) in curr_group_row
.iter_mut()
.zip(group_itrs.iter_mut())
.enumerate()
{
let next_v = if i == group_itrs_len - 1 && window > 0 {
// time column - apply window function
itr.next().unwrap() / window * window
} else {
*itr.next().unwrap()
};
if curr_v != &next_v {
group_key_changed = true;
}
@ -749,7 +807,7 @@ impl Segment {
}
}
let key = last_group_row.clone();
let key = last_group_row;
results.insert(key, group_key_aggregates);
log::info!("({:?} rows processed) {:?}", processed_rows, results);
@ -1447,6 +1505,7 @@ impl<'a> Segments<'a> {
predicates: &[(&str, Option<&column::Scalar>)],
group_columns: Vec<String>,
aggregates: Vec<(String, AggregateType)>,
window: i64,
strategy: &GroupingStrategy,
) -> BTreeMap<Vec<String>, Vec<((String, Aggregate), column::Aggregate)>> {
let (min, max) = time_range;
@ -1456,48 +1515,28 @@ impl<'a> Segments<'a> {
match strategy {
GroupingStrategy::HashGroup => {
return self.read_group_eq_hash(
time_range,
predicates,
group_columns,
aggregates,
false,
)
self.read_group_eq_hash(time_range, predicates, group_columns, aggregates, false)
}
GroupingStrategy::HashGroupConcurrent => {
return self.read_group_eq_hash(
time_range,
predicates,
group_columns,
aggregates,
true,
)
}
GroupingStrategy::SortGroup => {
return self.read_group_eq_sort(
time_range,
predicates,
group_columns,
aggregates,
false,
)
}
GroupingStrategy::SortGroupConcurrent => {
return self.read_group_eq_sort(
time_range,
predicates,
group_columns,
aggregates,
true,
)
self.read_group_eq_hash(time_range, predicates, group_columns, aggregates, true)
}
GroupingStrategy::SortGroup => self.read_group_eq_sort(
time_range,
predicates,
group_columns,
aggregates,
window,
false,
),
GroupingStrategy::SortGroupConcurrent => self.read_group_eq_sort(
time_range,
predicates,
group_columns,
aggregates,
window,
true,
),
}
// TODO(edd): merge results - not expensive really...
// let mut cum_results: BTreeMap<Vec<String>, Vec<((String, Aggregate), column::Aggregate)>> =
// BTreeMap::new();
// cum_results
}
fn read_group_eq_hash(
@ -1581,10 +1620,16 @@ impl<'a> Segments<'a> {
&self,
time_range: (i64, i64),
predicates: &[(&str, Option<&column::Scalar>)],
group_columns: Vec<String>,
mut group_columns: Vec<String>,
aggregates: Vec<(String, AggregateType)>,
window: i64,
concurrent: bool,
) -> BTreeMap<Vec<String>, Vec<((String, Aggregate), column::Aggregate)>> {
if window > 0 {
// add time column to the group key
group_columns.push("time".to_string());
}
if concurrent {
let group_columns_arc = std::sync::Arc::new(group_columns);
let aggregates_arc = std::sync::Arc::new(aggregates);
@ -1602,6 +1647,7 @@ impl<'a> Segments<'a> {
predicates,
&group_columns,
&aggregates,
window,
);
log::info!(
"processed segment {:?} using multi-threaded sort in {:?}",
@ -1622,6 +1668,7 @@ impl<'a> Segments<'a> {
predicates,
&group_columns_arc.clone(),
&aggregates_arc.clone(),
window,
);
log::info!(
"processed segment {:?} using multi-threaded sort in {:?}",
@ -1643,6 +1690,7 @@ impl<'a> Segments<'a> {
predicates,
&group_columns,
&aggregates,
window,
);
log::info!(
"processed segment {:?} using single-threaded sort in {:?}",