diff --git a/Cargo.lock b/Cargo.lock index 74118f4c09..475891fef3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2793,6 +2793,7 @@ dependencies = [ "croaring", "data_types", "either", + "hashbrown", "itertools 0.9.0", "packers", "permutation", diff --git a/segment_store/Cargo.toml b/segment_store/Cargo.toml index 6ecdf3e796..bcf4cd433c 100644 --- a/segment_store/Cargo.toml +++ b/segment_store/Cargo.toml @@ -15,6 +15,7 @@ croaring = "0.4.5" itertools = "0.9.0" either = "1.6.1" permutation = "0.2.5" +hashbrown = "0.9.1" [dev-dependencies] criterion = "0.3.3" diff --git a/segment_store/src/column.rs b/segment_store/src/column.rs index 83b8344ec9..02685a2f95 100644 --- a/segment_store/src/column.rs +++ b/segment_store/src/column.rs @@ -2558,6 +2558,14 @@ impl EncodedValues { panic!("cannot borrow &Vec"); } + /// Takes a `Vec` out of the enum. + pub fn take_u32(&mut self) -> Vec { + std::mem::take(match self { + Self::I64(a) => panic!("cannot take Vec out of I64 variant"), + Self::U32(arr) => arr, + }) + } + pub fn len(&self) -> usize { match self { Self::I64(v) => v.len(), diff --git a/segment_store/src/segment.rs b/segment_store/src/segment.rs index 9e79fc398c..73dfa0310e 100644 --- a/segment_store/src/segment.rs +++ b/segment_store/src/segment.rs @@ -1,8 +1,6 @@ -use std::{ - borrow::Cow, - collections::{BTreeMap, HashMap}, -}; +use std::{borrow::Cow, collections::BTreeMap}; +use hashbrown::{hash_map, HashMap}; use itertools::Itertools; use arrow_deps::arrow::datatypes::SchemaRef; @@ -338,10 +336,14 @@ impl Segment { ) } - /// Right now, predicates are treated as conjunctive (AND) predicates. + /// Returns a set of group keys and aggregated column data associated with + /// them. `read_group` currently only supports grouping on columns that have + /// integer encoded representations - typically "tag columns". + /// + /// Right now, predicates are treated conjunctive (AND) predicates. /// `read_group` does not guarantee any sort order. Ordering of results - /// should be handled high up in the `Table` section of the segment - /// store, where multiple segment results may need to be merged. + /// should be handled high up in the `Table` section of the segment store, + /// where multiple segment results may need to be merged. pub fn read_group( &self, predicates: &[Predicate<'_>], @@ -427,7 +429,7 @@ impl Segment { encoded_values_buf = col.all_encoded_values(encoded_values_buf); } } - encoded_values_buf + encoded_values_buf.take_u32() }) .collect(); @@ -451,51 +453,48 @@ impl Segment { aggregate_columns_data.push(column_values); } + self.read_group_hash_with_vec_key(dst, &groupby_encoded_ids, &aggregate_columns_data) + } + + fn read_group_hash_with_vec_key<'a>( + &'a self, + dst: &mut ReadGroupResult<'a>, + groupby_encoded_ids: &[Vec], + aggregate_columns_data: &[Values<'a>], + ) { // Now begin building the group keys. - let mut groups = HashMap::new(); + let mut groups: HashMap, Vec>> = HashMap::default(); // key_buf will be used as a temporary buffer for group keys, which are // themselves integers. - let mut key_buf = vec![0; group_cols_num]; + let mut key_buf = vec![0; dst.group_columns.len()]; for row in 0..groupby_encoded_ids[0].len() { // update the group key buffer with the group key for this row for (j, col_ids) in groupby_encoded_ids.iter().enumerate() { - match col_ids { - EncodedValues::I64(ids) => { - key_buf[j] = ids[row]; - } - EncodedValues::U32(ids) => { - // TODO(edd): hmmmm. This is unfortunate - we only need - // the encoded values to be 64-bit integers if we are grouping - // by time (i64 column). - key_buf[j] = ids[row] as i64; - } - } + key_buf[j] = col_ids[row]; } - // entry API requires allocating a key, which is too expensive. - if !groups.contains_key(&key_buf) { - // this vector will hold aggregates for this group key, which - // will be updated as the rows in the aggregate columns are - // iterated. - let mut group_key_aggs = Vec::with_capacity(agg_cols_num); - for (_, agg_type) in &dst.aggregate_columns { - group_key_aggs.push(AggregateResult::from(agg_type)); + match groups.raw_entry_mut().from_key(&key_buf) { + // aggregates for this group key are already present. Update them + hash_map::RawEntryMut::Occupied(mut entry) => { + for (i, values) in aggregate_columns_data.iter().enumerate() { + entry.get_mut()[i].update(values.value(row)); + } } + // group key does not exist, so create it. + hash_map::RawEntryMut::Vacant(entry) => { + let mut group_key_aggs = Vec::with_capacity(dst.aggregate_columns.len()); + for (_, agg_type) in &dst.aggregate_columns { + group_key_aggs.push(AggregateResult::from(agg_type)); + } - for (i, values) in aggregate_columns_data.iter().enumerate() { - group_key_aggs[i].update(values.value(row)); + for (i, values) in aggregate_columns_data.iter().enumerate() { + group_key_aggs[i].update(values.value(row)); + } + + entry.insert(key_buf.clone(), group_key_aggs); } - - groups.insert(key_buf.clone(), group_key_aggs); - continue; - } - - // Group key already exists - update all aggregates for that group key - let group_key_aggs = groups.get_mut(&key_buf).unwrap(); - for (i, values) in aggregate_columns_data.iter().enumerate() { - group_key_aggs[i].update(values.value(row)); } }