diff --git a/delorean_mem_qe/src/column.rs b/delorean_mem_qe/src/column.rs index bc89cb23bd..b3df18edca 100644 --- a/delorean_mem_qe/src/column.rs +++ b/delorean_mem_qe/src/column.rs @@ -537,6 +537,22 @@ impl Column { } } + /// Materialise all of the encoded values. + pub fn all_encoded_values(&self) -> Vector { + match self { + Column::String(c) => { + let now = std::time::Instant::now(); + let v = c.all_encoded_values(); + log::debug!("time getting all encoded values {:?}", now.elapsed()); + + log::debug!("dictionary {:?}", c.data.dictionary()); + Vector::Integer(v) + } + Column::Float(c) => Vector::Float(c.all_encoded_values()), + Column::Integer(c) => Vector::Integer(c.all_encoded_values()), + } + } + /// Given an encoded value for a row, materialise and return the decoded /// version. /// @@ -986,6 +1002,10 @@ impl String { self.data.encoded_values(row_ids) } + pub fn all_encoded_values(&self) -> Vec { + self.data.all_encoded_values() + } + /// Return the decoded value for an encoded ID. /// /// Panics if there is no decoded value for the provided id @@ -1037,6 +1057,10 @@ impl Float { self.data.encoded_values(row_ids) } + pub fn all_encoded_values(&self) -> Vec { + self.data.all_encoded_values() + } + pub fn scan_from(&self, row_id: usize) -> &[f64] { self.data.scan_from(row_id) } @@ -1106,6 +1130,10 @@ impl Integer { self.data.encoded_values(row_ids) } + pub fn all_encoded_values(&self) -> Vec { + self.data.all_encoded_values() + } + pub fn scan_from(&self, row_id: usize) -> &[i64] { self.data.scan_from(row_id) } diff --git a/delorean_mem_qe/src/encoding.rs b/delorean_mem_qe/src/encoding.rs index d6a865a5f1..4b057cfc96 100644 --- a/delorean_mem_qe/src/encoding.rs +++ b/delorean_mem_qe/src/encoding.rs @@ -68,6 +68,12 @@ where self.values(row_ids) } + /// Return all encoded values. For this encoding this is just the decoded + /// values + pub fn all_encoded_values(&self) -> Vec { + self.values.clone() + } + // TODO(edd): fix this when added NULL support pub fn scan_from_until_some(&self, _row_id: usize) -> Option { unreachable!("to remove"); @@ -485,6 +491,26 @@ impl DictionaryRLE { out } + // values materialises a vector of references to all logical values in the + // encoding. + pub fn all_values(&mut self) -> Vec> { + let mut out: Vec> = Vec::with_capacity(self.total as usize); + + // build reverse mapping. + let mut idx_value = BTreeMap::new(); + for (k, v) in &self.entry_index { + idx_value.insert(v, k); + } + assert_eq!(idx_value.len(), self.entry_index.len()); + + for (idx, rl) in &self.run_lengths { + // TODO(edd): fix unwrap - we know that the value exists in map... + let v = idx_value.get(&idx).unwrap().as_ref(); + out.extend(iter::repeat(v).take(*rl as usize)); + } + out + } + /// Return the decoded value for an encoded ID. /// /// Panics if there is no decoded value for the provided id @@ -528,22 +554,13 @@ impl DictionaryRLE { out } - // values materialises a vector of references to all logical values in the - // encoding. - pub fn all_values(&mut self) -> Vec> { - let mut out: Vec> = Vec::with_capacity(self.total as usize); - - // build reverse mapping. - let mut idx_value = BTreeMap::new(); - for (k, v) in &self.entry_index { - idx_value.insert(v, k); - } - assert_eq!(idx_value.len(), self.entry_index.len()); + // all_encoded_values materialises a vector of all encoded values for the + // column. + pub fn all_encoded_values(&self) -> Vec { + let mut out: Vec = Vec::with_capacity(self.total as usize); for (idx, rl) in &self.run_lengths { - // TODO(edd): fix unwrap - we know that the value exists in map... - let v = idx_value.get(&idx).unwrap().as_ref(); - out.extend(iter::repeat(v).take(*rl as usize)); + out.extend(iter::repeat(*idx as i64).take(*rl as usize)); } out } diff --git a/delorean_mem_qe/src/segment.rs b/delorean_mem_qe/src/segment.rs index c058df01f1..f8c500593e 100644 --- a/delorean_mem_qe/src/segment.rs +++ b/delorean_mem_qe/src/segment.rs @@ -228,7 +228,7 @@ impl Segment { group_columns: &[String], aggregates: &[(String, AggregateType)], window: i64, - ) -> BTreeMap, Vec<(String, Option)>> { + ) -> BTreeMap, Vec<(&String, &AggregateType, Option)>> { // Build a hash table - essentially, scan columns for matching row ids, // emitting the encoded value for each column and track those value // combinations in a hashmap with running aggregates. @@ -242,6 +242,10 @@ impl Segment { assert_ne!(group_columns[group_columns.len() - 1], "time"); } + // TODO(edd): Perf - if there is no predicate and we want entire segment + // then it will be a lot faster to not build filtered_row_ids and just + // get all encoded values for each grouping column... + // filter on predicates and time let filtered_row_ids: croaring::Bitmap; if let Some(row_ids) = self.filter_by_predicates_eq(time_range, predicates) { @@ -263,7 +267,12 @@ impl Segment { let mut group_column_encoded_values = Vec::with_capacity(group_columns.len()); for group_column in group_columns { if let Some(column) = self.column(&group_column) { - let encoded_values = column.encoded_values(&filtered_row_ids_vec); + let encoded_values = if filtered_row_ids_vec.len() == self.meta.rows { + column.all_encoded_values() + } else { + column.encoded_values(&filtered_row_ids_vec) + }; + assert_eq!( filtered_row_ids.cardinality() as usize, encoded_values.len() @@ -325,10 +334,10 @@ impl Segment { .collect::>(); // hashMap is about 20% faster than BTreeMap in this case - let mut hash_table: HashMap< + let mut hash_table: BTreeMap< Vec, Vec<(&String, &AggregateType, Option)>, - > = HashMap::new(); + > = BTreeMap::new(); let mut aggregate_row: Vec<(&str, Option)> = std::iter::repeat_with(|| ("", None)) @@ -406,8 +415,10 @@ impl Segment { } processed_rows += 1; } + // println!("groups: {:?}", hash_table.len()); log::debug!("({:?} rows processed) {:?}", processed_rows, hash_table); BTreeMap::new() + // hash_table } pub fn aggregate_by_group_using_sort( @@ -451,7 +462,11 @@ impl Segment { let mut group_column_encoded_values = Vec::with_capacity(group_columns.len()); for group_column in group_columns { if let Some(column) = self.column(&group_column) { - let encoded_values = column.encoded_values(&filtered_row_ids_vec); + let encoded_values = if filtered_row_ids_vec.len() == self.meta.rows { + column.all_encoded_values() + } else { + column.encoded_values(&filtered_row_ids_vec) + }; assert_eq!( filtered_row_ids.cardinality() as usize, encoded_values.len() @@ -557,6 +572,10 @@ impl Segment { assert_ne!(group_columns[group_columns.len() - 1], "time"); } + // TODO(edd): Perf - if there is no predicate and we want entire segment + // then it will be a lot faster to not build filtered_row_ids and just + // get all encoded values for each grouping column... + // filter on predicates and time let filtered_row_ids: croaring::Bitmap; if let Some(row_ids) = self.filter_by_predicates_eq(time_range, predicates) { @@ -577,7 +596,11 @@ impl Segment { let mut group_column_encoded_values = Vec::with_capacity(group_columns.len()); for group_column in group_columns { if let Some(column) = self.column(&group_column) { - let encoded_values = column.encoded_values(&filtered_row_ids_vec); + let encoded_values = if filtered_row_ids_vec.len() == self.meta.rows { + column.all_encoded_values() + } else { + column.encoded_values(&filtered_row_ids_vec) + }; assert_eq!( filtered_row_ids.cardinality() as usize, encoded_values.len() @@ -709,6 +732,7 @@ impl Segment { aggregates: group_key_aggregates, }); + // println!("groups: {:?}", results.len()); log::debug!("({:?} rows processed) {:?}", processed_rows, results); // results vec![]