feat: grouping and aggregate

pull/24376/head
Edd Robinson 2020-08-06 18:41:56 +01:00
parent e3e1611e82
commit da9d3cd528
4 changed files with 137 additions and 9 deletions

View File

@ -84,7 +84,29 @@ fn main() {
}
println!("{:?}", rows.cardinality());
time_row_by_preds(&store);
// time_row_by_preds(&store);
let group_ids = segments
.segments()
.last()
.unwrap()
.group_by_column_ids("env")
.unwrap();
for (col_values, row_ids) in group_ids {
let (min, max) = segments.segments().last().unwrap().time_range();
println!(
"({:?}, {:?}) SUM OF COLUMN env={:?} is {:?}",
min,
max,
col_values,
segments
.segments()
.last()
.unwrap()
.sum_column(&"counter", &row_ids)
);
}
}
fn build_store(

View File

@ -245,6 +245,24 @@ impl Column {
}
}
pub fn sum_by_ids(&self, row_ids: &croaring::Bitmap) -> Option<Scalar> {
match self {
Column::String(_) => unimplemented!("not implemented"),
Column::Float(c) => Some(Scalar::Float(c.sum_by_ids(row_ids))),
Column::Integer(_) => unimplemented!("not implemented"),
}
}
pub fn group_by_ids(
&self,
) -> &std::collections::BTreeMap<Option<std::string::String>, croaring::Bitmap> {
match self {
Column::String(c) => c.data.group_row_ids(),
Column::Float(_) => unimplemented!("not implemented"),
Column::Integer(_) => unimplemented!("not implemented"),
}
}
// TODO(edd) shouldn't let roaring stuff leak out...
pub fn row_ids_eq(&self, value: Option<&Scalar>) -> Option<croaring::Bitmap> {
if !self.maybe_contains(value) {
@ -353,6 +371,13 @@ impl String {
unreachable!("don't need this");
// self.data.scan_from_until_some(row_id)
}
// TODO(edd) shouldn't let roaring stuff leak out...
pub fn group_row_ids(
&self,
) -> &std::collections::BTreeMap<Option<std::string::String>, croaring::Bitmap> {
self.data.group_row_ids()
}
}
#[derive(Debug, Default)]
@ -383,6 +408,10 @@ impl Float {
pub fn scan_from_until_some(&self, row_id: usize) -> Option<f64> {
self.data.scan_from_until_some(row_id)
}
pub fn sum_by_ids(&self, row_ids: &croaring::Bitmap) -> f64 {
self.data.sum_by_ids(row_ids)
}
}
impl From<&[f64]> for Float {

View File

@ -23,7 +23,7 @@ pub struct PlainFixed<T> {
impl<T> PlainFixed<T>
where
T: PartialEq + PartialOrd + Copy + std::fmt::Debug,
T: Default + PartialEq + PartialOrd + Copy + std::fmt::Debug + std::ops::AddAssign,
{
pub fn size(&self) -> usize {
self.values.len() * std::mem::size_of::<T>()
@ -88,6 +88,17 @@ where
}
bm
}
// TODO(edd): make faster
pub fn sum_by_ids(&self, row_ids: &croaring::Bitmap) -> T {
let mut res = T::default();
row_ids.iter().for_each(|x| res += self.value(x as usize));
res
}
pub fn count_by_ids(&self, row_ids: &croaring::Bitmap) -> u64 {
row_ids.cardinality()
}
}
impl From<&[i64]> for PlainFixed<i64> {
@ -113,6 +124,9 @@ pub struct DictionaryRLE {
// stores the mapping between an entry and its assigned index.
entry_index: BTreeMap<Option<String>, usize>,
// Experiment - store rows that each entry has a value for
entry_row_ids: BTreeMap<Option<String>, croaring::Bitmap>,
// stores the mapping between an index and its entry.
index_entry: BTreeMap<usize, Option<String>>,
@ -130,6 +144,7 @@ impl DictionaryRLE {
pub fn new() -> Self {
Self {
entry_index: BTreeMap::new(),
entry_row_ids: BTreeMap::new(),
index_entry: BTreeMap::new(),
map_size: 0,
run_lengths: Vec::new(),
@ -147,7 +162,6 @@ impl DictionaryRLE {
}
pub fn push_additional(&mut self, v: Option<String>, additional: u64) {
self.total += additional;
let idx = self.entry_index.get(&v);
match idx {
Some(idx) => {
@ -160,6 +174,10 @@ impl DictionaryRLE {
self.run_lengths.push((*idx, additional));
self.run_length_size += std::mem::size_of::<(usize, u64)>();
}
self.entry_row_ids
.get_mut(&v)
.unwrap()
.add_range(self.total..self.total + additional);
}
}
None => {
@ -168,18 +186,24 @@ impl DictionaryRLE {
let idx = self.entry_index.len();
self.entry_index.insert(v.clone(), idx);
self.entry_row_ids
.insert(v.clone(), croaring::Bitmap::create());
if let Some(value) = &v {
self.map_size += value.len();
}
self.index_entry.insert(idx, v);
self.index_entry.insert(idx, v.clone());
self.map_size += 8 + std::mem::size_of::<usize>(); // TODO(edd): clean this option size up
self.run_lengths.push((idx, additional));
self.entry_row_ids
.get_mut(&v)
.unwrap()
.add_range(self.total..self.total + additional);
self.run_length_size += std::mem::size_of::<(usize, u64)>();
return;
}
}
}
self.total += additional;
}
// row_ids returns an iterator over the set of row ids matching the provided
@ -216,6 +240,11 @@ impl DictionaryRLE {
bm
}
// get the set of row ids for each distinct value
pub fn group_row_ids(&self) -> &BTreeMap<Option<String>, croaring::Bitmap> {
&self.entry_row_ids
}
// row_ids returns an iterator over the set of row ids matching the provided
// value
// pub fn row_ids(&'a self, value: &str) -> impl iter::Iterator<Item = usize> {
@ -457,6 +486,27 @@ mod test {
assert_eq!(drle.value(6).unwrap(), "zoo");
assert_eq!(drle.value(7).unwrap(), "zoo");
assert_eq!(drle.value(8).unwrap(), "zoo");
let row_ids = drle
.entry_row_ids
.get(&Some("hello".to_string()))
.unwrap()
.to_vec();
assert_eq!(row_ids, vec![0, 1, 3, 4, 5]);
let row_ids = drle
.entry_row_ids
.get(&Some("world".to_string()))
.unwrap()
.to_vec();
assert_eq!(row_ids, vec![2]);
let row_ids = drle
.entry_row_ids
.get(&Some("zoo".to_string()))
.unwrap()
.to_vec();
assert_eq!(row_ids, vec![6, 7, 8]);
}
#[test]

View File

@ -118,6 +118,23 @@ impl Segment {
)
}
pub fn group_by_column_ids(
&self,
name: &str,
) -> Option<&std::collections::BTreeMap<Option<std::string::String>, croaring::Bitmap>> {
if let Some(c) = self.column(name) {
return Some(c.group_by_ids());
}
None
}
pub fn sum_column(&self, name: &str, row_ids: &croaring::Bitmap) -> Option<column::Scalar> {
if let Some(c) = self.column(name) {
return c.sum_by_ids(row_ids);
}
None
}
pub fn filter_by_predicate_eq(
&self,
time_range: Option<(i64, i64)>,
@ -156,21 +173,31 @@ impl Segment {
}
// now intersect matching rows for each column
let mut bm = bm.unwrap();
// let mut bm = bm.unwrap();
for (col_pred_name, col_pred_value) in predicates {
if let Some(c) = self.column(col_pred_name) {
match c.row_ids_eq(col_pred_value) {
Some(row_ids) => {
bm.and_inplace(&row_ids);
if bm.is_empty() {
if row_ids.is_empty() {
return None;
}
match &mut bm {
Some(all) => {
all.and_inplace(&row_ids);
if all.is_empty() {
// no rows intersect
return None;
}
}
None => bm = Some(row_ids),
}
}
None => return None, // if this predicate doesn't match then no rows match
}
}
}
Some(bm)
bm
}
}