perf: faster group by with sorted cols
parent
f588b9ff61
commit
ab866073e3
|
@ -509,10 +509,10 @@ fn time_group_by_multi_agg_count(store: &Store) {
|
|||
//
|
||||
fn time_group_by_multi_agg_sorted_count(store: &Store) {
|
||||
let strats = vec![
|
||||
// GroupingStrategy::HashGroup,
|
||||
// GroupingStrategy::HashGroupConcurrent,
|
||||
GroupingStrategy::HashGroup,
|
||||
GroupingStrategy::HashGroupConcurrent,
|
||||
GroupingStrategy::SortGroup,
|
||||
// GroupingStrategy::SortGroupConcurrent,
|
||||
GroupingStrategy::SortGroupConcurrent,
|
||||
];
|
||||
|
||||
for strat in &strats {
|
||||
|
|
|
@ -410,13 +410,37 @@ impl Segment {
|
|||
log::debug!("{:?}", hash_table);
|
||||
BTreeMap::new()
|
||||
}
|
||||
|
||||
pub fn aggregate_by_group_with_sort(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
group_columns: &[String],
|
||||
aggregates: &[(String, AggregateType)],
|
||||
) -> BTreeMap<Vec<&i64>, Vec<(String, column::Aggregate)>> {
|
||||
if self.group_key_sorted(group_columns) {
|
||||
log::info!("group key is already sorted {:?}", group_columns);
|
||||
self.aggregate_by_group_with_sort_sorted(
|
||||
time_range,
|
||||
predicates,
|
||||
group_columns,
|
||||
aggregates,
|
||||
)
|
||||
} else {
|
||||
self.aggregate_by_group_with_sort_unsorted(
|
||||
time_range,
|
||||
predicates,
|
||||
group_columns,
|
||||
aggregates,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn aggregate_by_group_with_sort_unsorted(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
group_columns: &[String],
|
||||
aggregates: &[(String, AggregateType)],
|
||||
) -> BTreeMap<Vec<&i64>, Vec<(String, column::Aggregate)>> {
|
||||
// filter on predicates and time
|
||||
let filtered_row_ids: croaring::Bitmap;
|
||||
|
@ -490,7 +514,7 @@ impl Segment {
|
|||
|
||||
let now = std::time::Instant::now();
|
||||
if self.group_key_sorted(group_columns) {
|
||||
log::debug!("segment already sorted by group key {:?}", group_columns);
|
||||
panic!("This shouldn't be called!!!");
|
||||
} else {
|
||||
// now sort on the first grouping columns. Right now the order doesn't matter...
|
||||
let group_col_sort_order = &(0..group_columns.len()).collect::<Vec<_>>();
|
||||
|
@ -595,6 +619,281 @@ impl Segment {
|
|||
BTreeMap::new()
|
||||
}
|
||||
|
||||
// this method assumes that the segment's columns are sorted such that a
|
||||
// sort of columns is not required.
|
||||
fn aggregate_by_group_with_sort_sorted(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
group_columns: &[String],
|
||||
aggregates: &[(String, AggregateType)],
|
||||
) -> BTreeMap<Vec<&i64>, Vec<(String, column::Aggregate)>> {
|
||||
// filter on predicates and time
|
||||
let filtered_row_ids: croaring::Bitmap;
|
||||
if let Some(row_ids) = self.filter_by_predicates_eq(time_range, predicates) {
|
||||
filtered_row_ids = row_ids;
|
||||
} else {
|
||||
return BTreeMap::new();
|
||||
}
|
||||
let total_rows = &filtered_row_ids.cardinality();
|
||||
|
||||
let filtered_row_ids_vec = filtered_row_ids
|
||||
.to_vec()
|
||||
.iter()
|
||||
.map(|v| *v as usize)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// materialise all encoded values for the matching rows in the columns
|
||||
// we are grouping on and store each group as an iterator.
|
||||
let mut group_column_encoded_values = Vec::with_capacity(group_columns.len());
|
||||
for group_column in group_columns {
|
||||
if let Some(column) = self.column(&group_column) {
|
||||
let encoded_values = column.encoded_values(&filtered_row_ids_vec);
|
||||
assert_eq!(
|
||||
filtered_row_ids.cardinality() as usize,
|
||||
encoded_values.len()
|
||||
);
|
||||
|
||||
group_column_encoded_values.push(encoded_values);
|
||||
} else {
|
||||
panic!("need to handle no results for filtering/grouping...");
|
||||
}
|
||||
}
|
||||
|
||||
let mut new_agg_cols = Vec::with_capacity(aggregates.len());
|
||||
for (column_name, agg_type) in aggregates {
|
||||
new_agg_cols.push((column_name, agg_type, self.column(&column_name)));
|
||||
}
|
||||
|
||||
let mut group_itrs = group_column_encoded_values
|
||||
.iter()
|
||||
.map(|vector| {
|
||||
if let column::Vector::Integer(v) = vector {
|
||||
v.iter()
|
||||
} else {
|
||||
panic!("don't support grouping on non-encoded values");
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// this tracks the last seen group key row. When it changes we can emit
|
||||
// the grouped aggregates.
|
||||
let mut last_group_row = group_itrs
|
||||
.iter_mut()
|
||||
.map(|itr| itr.next().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut curr_group_row = last_group_row.clone();
|
||||
|
||||
let mut results = BTreeMap::new();
|
||||
let mut processed_rows = 1;
|
||||
|
||||
let mut group_key_start_row_id = 0;
|
||||
let mut group_size = 0;
|
||||
|
||||
while processed_rows < *total_rows {
|
||||
// update next group key.
|
||||
let mut group_key_changed = false;
|
||||
for (curr_v, itr) in curr_group_row.iter_mut().zip(group_itrs.iter_mut()) {
|
||||
let next_v = itr.next().unwrap();
|
||||
if curr_v != &next_v {
|
||||
group_key_changed = true;
|
||||
}
|
||||
*curr_v = next_v;
|
||||
}
|
||||
|
||||
// group key changed - emit group row and aggregates.
|
||||
if group_key_changed {
|
||||
let mut group_key_aggregates = Vec::with_capacity(aggregates.len());
|
||||
for (name, agg_type, col) in &new_agg_cols {
|
||||
if let Some(c) = col {
|
||||
let agg_result = c.aggregate_by_id_range(
|
||||
agg_type,
|
||||
group_key_start_row_id,
|
||||
group_key_start_row_id + group_size,
|
||||
);
|
||||
group_key_aggregates.push((name, agg_result));
|
||||
} else {
|
||||
panic!("figure this out");
|
||||
}
|
||||
}
|
||||
|
||||
let key = last_group_row.clone();
|
||||
results.insert(key, group_key_aggregates);
|
||||
|
||||
// update group key
|
||||
last_group_row = curr_group_row.clone();
|
||||
|
||||
// reset counters tracking group key row range
|
||||
group_key_start_row_id = processed_rows as usize; // TODO(edd) - could be an off-by-one?
|
||||
group_size = 0;
|
||||
}
|
||||
|
||||
group_size += 1;
|
||||
processed_rows += 1;
|
||||
}
|
||||
|
||||
// Emit final row
|
||||
let mut group_key_aggregates = Vec::with_capacity(aggregates.len());
|
||||
for (name, agg_type, col) in &new_agg_cols {
|
||||
if let Some(c) = col {
|
||||
let agg_result = c.aggregate_by_id_range(
|
||||
agg_type,
|
||||
group_key_start_row_id,
|
||||
group_key_start_row_id + group_size,
|
||||
);
|
||||
group_key_aggregates.push((name, agg_result));
|
||||
} else {
|
||||
panic!("figure this out");
|
||||
}
|
||||
}
|
||||
|
||||
let key = last_group_row.clone();
|
||||
results.insert(key, group_key_aggregates);
|
||||
|
||||
log::info!("({:?} rows processed) {:?}", processed_rows, results);
|
||||
// results
|
||||
BTreeMap::new()
|
||||
}
|
||||
|
||||
// this method assumes that the segment's columns are sorted such that a
|
||||
// sort of columns is not required.
|
||||
fn window_aggregate_with_sort_sorted(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
group_columns: &[String],
|
||||
aggregates: &[(String, AggregateType)],
|
||||
window: i64,
|
||||
) -> BTreeMap<Vec<&i64>, Vec<(String, column::Aggregate)>> {
|
||||
// filter on predicates and time
|
||||
let filtered_row_ids: croaring::Bitmap;
|
||||
if let Some(row_ids) = self.filter_by_predicates_eq(time_range, predicates) {
|
||||
filtered_row_ids = row_ids;
|
||||
} else {
|
||||
return BTreeMap::new();
|
||||
}
|
||||
let total_rows = &filtered_row_ids.cardinality();
|
||||
|
||||
let filtered_row_ids_vec = filtered_row_ids
|
||||
.to_vec()
|
||||
.iter()
|
||||
.map(|v| *v as usize)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// materialise all encoded values for the matching rows in the columns
|
||||
// we are grouping on and store each group as an iterator.
|
||||
let mut group_column_encoded_values = Vec::with_capacity(group_columns.len());
|
||||
for group_column in group_columns {
|
||||
if let Some(column) = self.column(&group_column) {
|
||||
let encoded_values = column.encoded_values(&filtered_row_ids_vec);
|
||||
assert_eq!(
|
||||
filtered_row_ids.cardinality() as usize,
|
||||
encoded_values.len()
|
||||
);
|
||||
|
||||
group_column_encoded_values.push(encoded_values);
|
||||
} else {
|
||||
panic!("need to handle no results for filtering/grouping...");
|
||||
}
|
||||
}
|
||||
|
||||
let mut new_agg_cols = Vec::with_capacity(aggregates.len());
|
||||
for (column_name, agg_type) in aggregates {
|
||||
new_agg_cols.push((column_name, agg_type, self.column(&column_name)));
|
||||
}
|
||||
|
||||
let mut group_itrs = group_column_encoded_values
|
||||
.iter()
|
||||
.map(|vector| {
|
||||
if let column::Vector::Integer(v) = vector {
|
||||
v.iter()
|
||||
} else {
|
||||
panic!("don't support grouping on non-encoded values");
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// this tracks the last seen group key row. When it changes we can emit
|
||||
// the grouped aggregates.
|
||||
let mut last_group_row = group_itrs
|
||||
.iter_mut()
|
||||
.map(|itr| itr.next().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut curr_group_row = last_group_row.clone();
|
||||
|
||||
let mut results = BTreeMap::new();
|
||||
let mut processed_rows = 1;
|
||||
|
||||
let mut group_key_start_row_id = 0;
|
||||
let mut group_size = 0;
|
||||
|
||||
while processed_rows < *total_rows {
|
||||
// update next group key.
|
||||
let mut group_key_changed = false;
|
||||
for (curr_v, itr) in curr_group_row.iter_mut().zip(group_itrs.iter_mut()) {
|
||||
let next_v = itr.next().unwrap();
|
||||
if curr_v != &next_v {
|
||||
group_key_changed = true;
|
||||
}
|
||||
*curr_v = next_v;
|
||||
}
|
||||
|
||||
// group key changed - emit group row and aggregates.
|
||||
if group_key_changed {
|
||||
let mut group_key_aggregates = Vec::with_capacity(aggregates.len());
|
||||
for (name, agg_type, col) in &new_agg_cols {
|
||||
if let Some(c) = col {
|
||||
let agg_result = c.aggregate_by_id_range(
|
||||
agg_type,
|
||||
group_key_start_row_id,
|
||||
group_key_start_row_id + group_size,
|
||||
);
|
||||
group_key_aggregates.push((name, agg_result));
|
||||
} else {
|
||||
panic!("figure this out");
|
||||
}
|
||||
}
|
||||
|
||||
let key = last_group_row.clone();
|
||||
results.insert(key, group_key_aggregates);
|
||||
|
||||
// update group key
|
||||
last_group_row = curr_group_row.clone();
|
||||
|
||||
// reset counters tracking group key row range
|
||||
group_key_start_row_id = processed_rows as usize; // TODO(edd) - could be an off-by-one?
|
||||
group_size = 0;
|
||||
}
|
||||
|
||||
group_size += 1;
|
||||
processed_rows += 1;
|
||||
}
|
||||
|
||||
// Emit final row
|
||||
let mut group_key_aggregates = Vec::with_capacity(aggregates.len());
|
||||
for (name, agg_type, col) in &new_agg_cols {
|
||||
if let Some(c) = col {
|
||||
let agg_result = c.aggregate_by_id_range(
|
||||
agg_type,
|
||||
group_key_start_row_id,
|
||||
group_key_start_row_id + group_size,
|
||||
);
|
||||
group_key_aggregates.push((name, agg_result));
|
||||
} else {
|
||||
panic!("figure this out");
|
||||
}
|
||||
}
|
||||
|
||||
let key = last_group_row.clone();
|
||||
results.insert(key, group_key_aggregates);
|
||||
|
||||
log::info!("({:?} rows processed) {:?}", processed_rows, results);
|
||||
// results
|
||||
BTreeMap::new()
|
||||
}
|
||||
|
||||
pub fn sum_column(&self, name: &str, row_ids: &mut croaring::Bitmap) -> Option<column::Scalar> {
|
||||
if let Some(c) = self.column(name) {
|
||||
return c.sum_by_ids(row_ids);
|
||||
|
@ -1058,7 +1357,7 @@ impl<'a> Segments<'a> {
|
|||
&aggregates,
|
||||
);
|
||||
log::info!(
|
||||
"processed segment {:?} using multi-threaded hash-grouping in {:?}",
|
||||
"processed segment {:?} using multi-threaded sort in {:?}",
|
||||
segment.time_range(),
|
||||
now.elapsed()
|
||||
)
|
||||
|
@ -1078,7 +1377,7 @@ impl<'a> Segments<'a> {
|
|||
&aggregates_arc.clone(),
|
||||
);
|
||||
log::info!(
|
||||
"processed segment {:?} using multi-threaded hash-grouping in {:?}",
|
||||
"processed segment {:?} using multi-threaded sort in {:?}",
|
||||
segment.time_range(),
|
||||
now.elapsed()
|
||||
)
|
||||
|
@ -1099,7 +1398,7 @@ impl<'a> Segments<'a> {
|
|||
&aggregates,
|
||||
);
|
||||
log::info!(
|
||||
"processed segment {:?} using single-threaded hash-grouping in {:?}",
|
||||
"processed segment {:?} using single-threaded sort in {:?}",
|
||||
segment.time_range(),
|
||||
now.elapsed()
|
||||
)
|
||||
|
@ -1129,6 +1428,48 @@ impl<'a> Segments<'a> {
|
|||
min_min
|
||||
}
|
||||
|
||||
pub fn window_agg_eq(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
group_columns: Vec<String>,
|
||||
aggregates: Vec<(String, AggregateType)>,
|
||||
strategy: &GroupingStrategy,
|
||||
window: i64,
|
||||
) -> BTreeMap<Vec<String>, Vec<((String, Aggregate), column::Aggregate)>> {
|
||||
let (min, max) = time_range;
|
||||
if max <= min {
|
||||
panic!("max <= min");
|
||||
}
|
||||
|
||||
match strategy {
|
||||
GroupingStrategy::HashGroup => {
|
||||
panic!("not yet");
|
||||
}
|
||||
GroupingStrategy::HashGroupConcurrent => {
|
||||
panic!("not yet");
|
||||
}
|
||||
GroupingStrategy::SortGroup => {
|
||||
return self.read_group_eq_sort(
|
||||
time_range,
|
||||
predicates,
|
||||
group_columns,
|
||||
aggregates,
|
||||
false,
|
||||
)
|
||||
}
|
||||
GroupingStrategy::SortGroupConcurrent => {
|
||||
panic!("not yet");
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(edd): merge results - not expensive really...
|
||||
// let mut cum_results: BTreeMap<Vec<String>, Vec<((String, Aggregate), column::Aggregate)>> =
|
||||
// BTreeMap::new();
|
||||
|
||||
// cum_results
|
||||
}
|
||||
|
||||
/// Returns the maximum value for a column in a set of segments.
|
||||
pub fn column_max(&self, column_name: &str) -> Option<column::Scalar> {
|
||||
if self.segments.is_empty() {
|
||||
|
|
Loading…
Reference in New Issue