From fcc978bb751a1bf8c384c71750f5be6fc4923577 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 24 Feb 2021 22:20:51 +0000 Subject: [PATCH] refactor: wire up distinct_values with iterator --- read_buffer/src/column.rs | 80 +++++++++--- read_buffer/src/column/encoding/dictionary.rs | 45 +++---- .../src/column/encoding/dictionary/plain.rs | 14 +-- .../src/column/encoding/dictionary/rle.rs | 8 +- read_buffer/src/column/string.rs | 8 +- read_buffer/src/row_group.rs | 118 ++++++++++++++++++ 6 files changed, 212 insertions(+), 61 deletions(-) diff --git a/read_buffer/src/column.rs b/read_buffer/src/column.rs index 67c8a61a20..60cf71254f 100644 --- a/read_buffer/src/column.rs +++ b/read_buffer/src/column.rs @@ -13,7 +13,7 @@ use either::Either; use arrow_deps::{arrow, arrow::array::Array}; use crate::schema::LogicalDataType; -use crate::value::{EncodedValues, OwnedValue, Scalar, Value, ValueSet, Values}; +use crate::value::{EncodedValues, OwnedValue, Scalar, Value, Values}; use boolean::BooleanEncoding; use encoding::{bool, dictionary, fixed_null}; use float::FloatEncoding; @@ -240,18 +240,24 @@ impl Column { } // The distinct set of values found at the logical row ids. - pub fn distinct_values(&self, row_ids: &[u32]) -> ValueSet<'_> { - assert!( - row_ids.len() as u32 <= self.num_rows(), - "too many row ids {:?} provided for column with {:?} rows", - row_ids.len(), - self.num_rows() - ); - + pub fn distinct_values(&self, row_ids: impl Iterator) -> BTreeSet> { match &self { Column::String(_, data) => data.distinct_values(row_ids), - Column::ByteArray(_, _) => todo!(), - _ => unimplemented!("distinct values is not implemented for this type"), + Column::Float(_, _) => { + unimplemented!("distinct values is not implemented for Float column") + } + Column::Integer(_, _) => { + unimplemented!("distinct values is not implemented for Integer column") + } + Column::Unsigned(_, _) => { + unimplemented!("distinct values is not implemented for Unsigned column") + } + Column::Bool(_, _) => { + unimplemented!("distinct values is not implemented for Bool column") + } + Column::ByteArray(_, _) => { + unimplemented!("distinct values is not implemented for ByteArray column") + } } } @@ -679,7 +685,7 @@ impl Column { /// Determines if the column contains other values than those provided in /// `values`. - pub fn contains_other_values(&self, values: &BTreeSet>) -> bool { + pub fn has_other_values(&self, values: &BTreeSet) -> bool { todo!() } } @@ -1107,10 +1113,11 @@ pub enum RowIDsOption { impl RowIDsOption { /// Returns the `Some` variant or panics. pub fn unwrap(&self) -> &RowIDs { - if let Self::Some(ids) = self { - return ids; + match &self { + RowIDsOption::None(_) => panic!("cannot unwrap RowIDsOption to RowIDs"), + RowIDsOption::Some(ids) => ids, + RowIDsOption::All(ids) => ids, } - panic!("cannot unwrap RowIDsOption to RowIDs"); } } @@ -1159,6 +1166,15 @@ impl RowIDs { } } + /// Returns an iterator over the contents of the RowIDs. + pub fn iter(&self) -> RowIDsIterator<'_> { + match self { + RowIDs::Bitmap(bm) => RowIDsIterator::new(bm.iter()), + // we want an iterator of u32 rather than &u32. + RowIDs::Vector(vec) => RowIDsIterator::new(vec.iter().cloned()), + } + } + // Converts the RowIDs to a Vec. This is expensive and should only be // used for testing. pub fn to_vec(&self) -> Vec { @@ -1241,6 +1257,24 @@ impl RowIDs { } } +pub struct RowIDsIterator<'a> { + itr: Box + 'a>, +} + +impl<'a> RowIDsIterator<'a> { + fn new(itr: impl Iterator + 'a) -> Self { + Self { itr: Box::new(itr) } + } +} + +impl Iterator for RowIDsIterator<'_> { + type Item = u32; + + fn next(&mut self) -> Option { + self.itr.next() + } +} + #[cfg(test)] mod test { use super::*; @@ -1484,15 +1518,21 @@ mod test { Some("world"), ]; - let hello = "hello".to_string(); - let world = "world".to_string(); let mut exp = BTreeSet::new(); - exp.insert(Some(&hello)); - exp.insert(Some(&world)); + exp.insert(Some("hello")); + exp.insert(Some("world")); exp.insert(None); let col = Column::from(&input[..]); - assert_eq!(col.distinct_values(&[0, 1, 2, 3, 4]), ValueSet::String(exp)); + assert_eq!(col.distinct_values(vec![0, 1, 2, 3, 4].into_iter()), exp); + assert_eq!( + col.distinct_values(RowIDs::Vector(vec![0, 1, 2, 3, 4]).iter()), + exp + ); + + let mut bm = Bitmap::create(); + bm.add_many(&[0, 1, 2, 3, 4]); + assert_eq!(col.distinct_values(RowIDs::Bitmap(bm).iter()), exp); } #[test] diff --git a/read_buffer/src/column/encoding/dictionary.rs b/read_buffer/src/column/encoding/dictionary.rs index 34d673fbe7..615dfcd6f8 100644 --- a/read_buffer/src/column/encoding/dictionary.rs +++ b/read_buffer/src/column/encoding/dictionary.rs @@ -237,9 +237,9 @@ impl Encoding { /// increasing set. fn distinct_values<'a>( &'a self, - row_ids: &[u32], - dst: BTreeSet>, - ) -> BTreeSet> { + row_ids: impl Iterator, + dst: BTreeSet>, + ) -> BTreeSet> { match self { Encoding::RLE(enc) => enc.distinct_values(row_ids, dst), Encoding::Plain(enc) => enc.distinct_values(row_ids, dst), @@ -1046,12 +1046,10 @@ mod test { enc.push_additional(Some("east".to_string()), 3); - let values = enc.distinct_values((0..3).collect::>().as_slice(), BTreeSet::new()); + let values = enc.distinct_values((0..3).collect::>().into_iter(), BTreeSet::new()); assert_eq!( values, - vec![Some(&"east".to_string())] - .into_iter() - .collect::>(), + vec![Some("east")].into_iter().collect::>(), "{}", name, ); @@ -1061,35 +1059,30 @@ mod test { enc.push_additional(Some("south".to_string()), 2); // 9, 10 enc.push_none(); // 11 - let values = enc.distinct_values((0..12).collect::>().as_slice(), BTreeSet::new()); + let values = enc.distinct_values((0..12).collect::>().into_iter(), BTreeSet::new()); assert_eq!( values, - vec![ - None, - Some(&"east".to_string()), - Some(&"north".to_string()), - Some(&"south".to_string()), - ] - .into_iter() - .collect::>(), - "{}", - name, - ); - - let values = enc.distinct_values((0..4).collect::>().as_slice(), BTreeSet::new()); - assert_eq!( - values, - vec![Some(&"east".to_string()), Some(&"north".to_string()),] + vec![None, Some("east"), Some("north"), Some("south"),] .into_iter() .collect::>(), "{}", name, ); - let values = enc.distinct_values(&[3, 10], BTreeSet::new()); + let values = enc.distinct_values((0..4).collect::>().into_iter(), BTreeSet::new()); assert_eq!( values, - vec![Some(&"north".to_string()), Some(&"south".to_string()),] + vec![Some("east"), Some("north"),] + .into_iter() + .collect::>(), + "{}", + name, + ); + + let values = enc.distinct_values(vec![3, 10].into_iter(), BTreeSet::new()); + assert_eq!( + values, + vec![Some("north"), Some("south"),] .into_iter() .collect::>(), "{}", diff --git a/read_buffer/src/column/encoding/dictionary/plain.rs b/read_buffer/src/column/encoding/dictionary/plain.rs index 3a406f560c..1455cf8980 100644 --- a/read_buffer/src/column/encoding/dictionary/plain.rs +++ b/read_buffer/src/column/encoding/dictionary/plain.rs @@ -618,19 +618,19 @@ impl Plain { /// increasing set. pub fn distinct_values<'a>( &'a self, - row_ids: &[u32], - mut dst: BTreeSet>, - ) -> BTreeSet> { + row_ids: impl Iterator, + mut dst: BTreeSet>, + ) -> BTreeSet> { // TODO(edd): Perf... We can improve on this if we know the column is // totally ordered. dst.clear(); - for &row_id in row_ids { + for row_id in row_ids { let encoded_id = self.encoded_data[row_id as usize]; - let value = &self.entries[encoded_id as usize].as_ref(); - if !dst.contains(value) { - dst.insert(*value); + let value = self.entries[encoded_id as usize].as_deref(); + if !dst.contains(&value) { + dst.insert(value); } if dst.len() as u32 == self.cardinality() { diff --git a/read_buffer/src/column/encoding/dictionary/rle.rs b/read_buffer/src/column/encoding/dictionary/rle.rs index e2fb72994a..d23ef4c945 100644 --- a/read_buffer/src/column/encoding/dictionary/rle.rs +++ b/read_buffer/src/column/encoding/dictionary/rle.rs @@ -664,9 +664,9 @@ impl RLE { /// increasing set. pub fn distinct_values<'a>( &'a self, - row_ids: &[u32], - mut dst: BTreeSet>, - ) -> BTreeSet> { + row_ids: impl Iterator, + mut dst: BTreeSet>, + ) -> BTreeSet> { // TODO(edd): Perf... We can improve on this if we know the column is // totally ordered. dst.clear(); @@ -689,7 +689,7 @@ impl RLE { let mut i = 1; 'by_row: for row_id in row_ids { - while curr_logical_row_id + curr_entry_rl <= *row_id { + while curr_logical_row_id + curr_entry_rl <= row_id { // this encoded entry does not cover the row we need. // move on to next entry curr_logical_row_id += curr_entry_rl; diff --git a/read_buffer/src/column/string.rs b/read_buffer/src/column/string.rs index 1f95c3a4e0..e1acf34869 100644 --- a/read_buffer/src/column/string.rs +++ b/read_buffer/src/column/string.rs @@ -3,8 +3,8 @@ use std::collections::BTreeSet; use arrow_deps::arrow::{self, array::Array}; use either::Either; +use super::cmp; use super::encoding::dictionary::{Encoding, Plain, RLE}; -use super::{cmp, ValueSet}; use crate::column::{RowIDs, Value, Values}; // Edd's totally made up magic constant. This determines whether we would use @@ -142,10 +142,10 @@ impl StringEncoding { /// Returns the distinct set of values found at the provided row ids. /// /// TODO(edd): perf - pooling of destination sets. - pub fn distinct_values(&self, row_ids: &[u32]) -> ValueSet<'_> { + pub fn distinct_values(&self, row_ids: impl Iterator) -> BTreeSet> { match &self { - Self::RLEDictionary(c) => ValueSet::String(c.distinct_values(row_ids, BTreeSet::new())), - Self::Dictionary(c) => ValueSet::String(c.distinct_values(row_ids, BTreeSet::new())), + Self::RLEDictionary(c) => c.distinct_values(row_ids, BTreeSet::new()), + Self::Dictionary(c) => c.distinct_values(row_ids, BTreeSet::new()), } } diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 1ef1d0cfae..57ca2a5f07 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -992,6 +992,62 @@ impl RowGroup { } } } + + /// Returns the distinct set of values for the selected columns, constrained + /// by an optional predicate. + pub fn column_values<'a>( + &'a self, + predicate: &Predicate, + columns: &[ColumnName<'_>], + mut dst: BTreeMap>, + ) -> BTreeMap> { + // Build up candidate columns + let candidate_columns = self + .all_columns_by_name + .iter() + // Filter any columns that are not present in the `Selection`. + .filter_map(|(name, &id)| { + if columns.iter().any(|selection| name == selection) { + Some((name, &self.columns[id])) + } else { + None + } + }) + // Further filter candidate columns by removing any columns that we + // can prove we already have all the distinct values for. + .filter(|(name, column)| { + match dst.get(*name) { + // process the column if we haven't got all the distinct + // values. + Some(values) => column.has_other_values(values), + // no existing values for this column - we will need to + // process it. + None => true, + } + }) + .collect::>(); + + let row_ids = self.row_ids_from_predicate(predicate); + for (name, column) in candidate_columns { + // If no rows match there is nothing to do, if some rows match then + // extract an iterator of those IDs. If all rows match then create + // an iterator of all rows without materialising them. + let row_itr: Box> = match &row_ids { + RowIDsOption::None(_) => return dst, + RowIDsOption::Some(row_ids) => Box::new(row_ids.iter()), + RowIDsOption::All(_) => Box::new(0..self.rows()), + }; + + let results = dst.entry(name.clone()).or_default(); + for value in column.distinct_values(row_itr).iter() { + if value.is_some() && !results.contains(value.unwrap()) { + results.insert(value.unwrap().to_owned()); + } + } + } + + dst + } } /// Initialise a `RowGroup` from an Arrow RecordBatch. @@ -2946,4 +3002,66 @@ west,host-d,11,9 vec!["temp".to_owned()], ); } + + fn to_map<'a>(arr: Vec<(&str, &[&'a str])>) -> BTreeMap> { + arr.iter() + .map(|(k, values)| { + ( + k.to_string(), + values + .iter() + .map(|s| s.to_string()) + .collect::>(), + ) + }) + .collect::>() + } + + #[test] + fn column_values() { + // Build a row group. + let mut columns = BTreeMap::new(); + let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3][..])); + columns.insert("time".to_string(), tc); + + let rc = ColumnType::Tag(Column::from(&["west", "south", "north"][..])); + columns.insert("region".to_string(), rc); + + let ec = ColumnType::Tag(Column::from(&["prod", "stag", "stag"][..])); + columns.insert("env".to_string(), ec); + + let rg = RowGroup::new(3, columns); + + let result = rg.column_values(&Predicate::default(), &["region"], BTreeMap::new()); + assert_eq!( + result, + to_map(vec![("region", &["north", "west", "south"])]) + ); + + let result = rg.column_values(&Predicate::default(), &["env", "region"], BTreeMap::new()); + assert_eq!( + result, + to_map(vec![ + ("env", &["prod", "stag"]), + ("region", &["north", "west", "south"]) + ]) + ); + + let result = rg.column_values( + &Predicate::new(vec![BinaryExpr::from(("time", ">", 1_i64))]), + &["env", "region"], + BTreeMap::new(), + ); + assert_eq!( + result, + to_map(vec![("env", &["stag"]), ("region", &["north", "south"])]) + ); + + let result = rg.column_values( + &Predicate::new(vec![BinaryExpr::from(("time", ">", 4_i64))]), + &["env", "region"], + BTreeMap::new(), + ); + assert_eq!(result, to_map(vec![])); + } }