Merge pull request #572 from influxdata/er/feat/segment-read-group-hash-perf
perf: Improve `read_group` performance with faster hash functionpull/24376/head
commit
5ed977f2d1
|
@ -2793,6 +2793,7 @@ dependencies = [
|
|||
"croaring",
|
||||
"data_types",
|
||||
"either",
|
||||
"hashbrown",
|
||||
"itertools 0.9.0",
|
||||
"packers",
|
||||
"permutation",
|
||||
|
|
|
@ -15,6 +15,7 @@ croaring = "0.4.5"
|
|||
itertools = "0.9.0"
|
||||
either = "1.6.1"
|
||||
permutation = "0.2.5"
|
||||
hashbrown = "0.9.1"
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = "0.3.3"
|
||||
|
|
|
@ -2558,6 +2558,14 @@ impl EncodedValues {
|
|||
panic!("cannot borrow &Vec<u32>");
|
||||
}
|
||||
|
||||
/// Takes a `Vec<u32>` out of the enum.
|
||||
pub fn take_u32(&mut self) -> Vec<u32> {
|
||||
std::mem::take(match self {
|
||||
Self::I64(a) => panic!("cannot take Vec<u32> out of I64 variant"),
|
||||
Self::U32(arr) => arr,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
match self {
|
||||
Self::I64(v) => v.len(),
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
use std::{
|
||||
borrow::Cow,
|
||||
collections::{BTreeMap, HashMap},
|
||||
};
|
||||
use std::{borrow::Cow, collections::BTreeMap};
|
||||
|
||||
use hashbrown::{hash_map, HashMap};
|
||||
use itertools::Itertools;
|
||||
|
||||
use arrow_deps::arrow::datatypes::SchemaRef;
|
||||
|
@ -338,10 +336,14 @@ impl Segment {
|
|||
)
|
||||
}
|
||||
|
||||
/// Right now, predicates are treated as conjunctive (AND) predicates.
|
||||
/// Returns a set of group keys and aggregated column data associated with
|
||||
/// them. `read_group` currently only supports grouping on columns that have
|
||||
/// integer encoded representations - typically "tag columns".
|
||||
///
|
||||
/// Right now, predicates are treated conjunctive (AND) predicates.
|
||||
/// `read_group` does not guarantee any sort order. Ordering of results
|
||||
/// should be handled high up in the `Table` section of the segment
|
||||
/// store, where multiple segment results may need to be merged.
|
||||
/// should be handled high up in the `Table` section of the segment store,
|
||||
/// where multiple segment results may need to be merged.
|
||||
pub fn read_group(
|
||||
&self,
|
||||
predicates: &[Predicate<'_>],
|
||||
|
@ -427,7 +429,7 @@ impl Segment {
|
|||
encoded_values_buf = col.all_encoded_values(encoded_values_buf);
|
||||
}
|
||||
}
|
||||
encoded_values_buf
|
||||
encoded_values_buf.take_u32()
|
||||
})
|
||||
.collect();
|
||||
|
||||
|
@ -451,51 +453,48 @@ impl Segment {
|
|||
aggregate_columns_data.push(column_values);
|
||||
}
|
||||
|
||||
self.read_group_hash_with_vec_key(dst, &groupby_encoded_ids, &aggregate_columns_data)
|
||||
}
|
||||
|
||||
fn read_group_hash_with_vec_key<'a>(
|
||||
&'a self,
|
||||
dst: &mut ReadGroupResult<'a>,
|
||||
groupby_encoded_ids: &[Vec<u32>],
|
||||
aggregate_columns_data: &[Values<'a>],
|
||||
) {
|
||||
// Now begin building the group keys.
|
||||
let mut groups = HashMap::new();
|
||||
let mut groups: HashMap<Vec<u32>, Vec<AggregateResult<'_>>> = HashMap::default();
|
||||
|
||||
// key_buf will be used as a temporary buffer for group keys, which are
|
||||
// themselves integers.
|
||||
let mut key_buf = vec![0; group_cols_num];
|
||||
let mut key_buf = vec![0; dst.group_columns.len()];
|
||||
|
||||
for row in 0..groupby_encoded_ids[0].len() {
|
||||
// update the group key buffer with the group key for this row
|
||||
for (j, col_ids) in groupby_encoded_ids.iter().enumerate() {
|
||||
match col_ids {
|
||||
EncodedValues::I64(ids) => {
|
||||
key_buf[j] = ids[row];
|
||||
}
|
||||
EncodedValues::U32(ids) => {
|
||||
// TODO(edd): hmmmm. This is unfortunate - we only need
|
||||
// the encoded values to be 64-bit integers if we are grouping
|
||||
// by time (i64 column).
|
||||
key_buf[j] = ids[row] as i64;
|
||||
}
|
||||
}
|
||||
key_buf[j] = col_ids[row];
|
||||
}
|
||||
|
||||
// entry API requires allocating a key, which is too expensive.
|
||||
if !groups.contains_key(&key_buf) {
|
||||
// this vector will hold aggregates for this group key, which
|
||||
// will be updated as the rows in the aggregate columns are
|
||||
// iterated.
|
||||
let mut group_key_aggs = Vec::with_capacity(agg_cols_num);
|
||||
for (_, agg_type) in &dst.aggregate_columns {
|
||||
group_key_aggs.push(AggregateResult::from(agg_type));
|
||||
match groups.raw_entry_mut().from_key(&key_buf) {
|
||||
// aggregates for this group key are already present. Update them
|
||||
hash_map::RawEntryMut::Occupied(mut entry) => {
|
||||
for (i, values) in aggregate_columns_data.iter().enumerate() {
|
||||
entry.get_mut()[i].update(values.value(row));
|
||||
}
|
||||
}
|
||||
// group key does not exist, so create it.
|
||||
hash_map::RawEntryMut::Vacant(entry) => {
|
||||
let mut group_key_aggs = Vec::with_capacity(dst.aggregate_columns.len());
|
||||
for (_, agg_type) in &dst.aggregate_columns {
|
||||
group_key_aggs.push(AggregateResult::from(agg_type));
|
||||
}
|
||||
|
||||
for (i, values) in aggregate_columns_data.iter().enumerate() {
|
||||
group_key_aggs[i].update(values.value(row));
|
||||
for (i, values) in aggregate_columns_data.iter().enumerate() {
|
||||
group_key_aggs[i].update(values.value(row));
|
||||
}
|
||||
|
||||
entry.insert(key_buf.clone(), group_key_aggs);
|
||||
}
|
||||
|
||||
groups.insert(key_buf.clone(), group_key_aggs);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Group key already exists - update all aggregates for that group key
|
||||
let group_key_aggs = groups.get_mut(&key_buf).unwrap();
|
||||
for (i, values) in aggregate_columns_data.iter().enumerate() {
|
||||
group_key_aggs[i].update(values.value(row));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue