diff --git a/delorean_mem_qe/src/bin/main.rs b/delorean_mem_qe/src/bin/main.rs index 82e5adc4e4..ae104d35a8 100644 --- a/delorean_mem_qe/src/bin/main.rs +++ b/delorean_mem_qe/src/bin/main.rs @@ -45,8 +45,8 @@ fn main() { let segments = segments .filter_by_time(1590036110000000, 1590044410000000) - .filter_by_predicate_eq("env", &column::Scalar::String("toolsus1")); - let res = segments.first("env").unwrap(); + .filter_by_predicate_eq("env", &column::Scalar::String("prod01-eu-central-1")); + let res = segments.first("env"); println!("{:?}", res); } @@ -61,7 +61,7 @@ fn build_store( Ok(()) } -fn convert_record_batch<'a>(rb: RecordBatch) -> Result { +fn convert_record_batch(rb: RecordBatch) -> Result { let mut segment = Segment::default(); // println!("cols {:?} rows {:?}", rb.num_columns(), rb.num_rows()); @@ -89,23 +89,31 @@ fn convert_record_batch<'a>(rb: RecordBatch) -> Result { segment.add_column(rb.schema().field(i).name(), column); } datatypes::DataType::Utf8 => { - if column.null_count() > 0 { - panic!("null tag"); - } let arr = column .as_any() .downcast_ref::() .unwrap(); let mut c = column::String::default(); - let mut prev = arr.value(0); + let mut prev: Option<&str> = None; + if !column.is_null(0) { + prev = Some(arr.value(0)); + } + let mut count = 1_u64; for j in 1..arr.len() { - let next = arr.value(j); + let mut next = Some(arr.value(j)); + if column.is_null(j) { + next = None; + } + if prev == next { count += 1; } else { - c.add_additional(prev, count); + match prev { + Some(x) => c.add_additional(Some(x.to_string()), count), + None => c.add_additional(None, count), + } prev = next; count = 1; } diff --git a/delorean_mem_qe/src/column.rs b/delorean_mem_qe/src/column.rs index 3676e13c2d..4597cf80c5 100644 --- a/delorean_mem_qe/src/column.rs +++ b/delorean_mem_qe/src/column.rs @@ -41,10 +41,11 @@ impl Column { if row_id >= self.num_rows() { return None; } - if let Some(v) = c.value(row_id) { - return Some(Scalar::String(v)); - }; - None + + match c.value(row_id) { + Some(v) => Some(Scalar::String(v)), + None => None, + } } Column::Float(c) => { if row_id >= self.num_rows() { @@ -65,7 +66,7 @@ impl Column { match self { Column::String(c) => { if let Scalar::String(v) = value { - c.meta.maybe_contains_value(v.to_string()) + c.meta.maybe_contains_value(&v.to_string()) } else { panic!("invalid value"); } @@ -87,19 +88,31 @@ impl Column { } } - pub fn min(&self) -> Scalar { + // FIXME(edd): Support NULL integers and floats + pub fn min(&self) -> Option { match self { - Column::String(c) => Scalar::String(c.meta.range().0), - Column::Float(c) => Scalar::Float(c.meta.range().0), - Column::Integer(c) => Scalar::Integer(c.meta.range().0), + Column::String(c) => { + if let Some(min) = c.meta.range().0 { + return Some(Scalar::String(min)); + } + None + } + Column::Float(c) => Some(Scalar::Float(c.meta.range().0)), + Column::Integer(c) => Some(Scalar::Integer(c.meta.range().0)), } } - pub fn max(&self) -> Scalar { + // FIXME(edd): Support NULL integers and floats + pub fn max(&self) -> Option { match self { - Column::String(c) => Scalar::String(c.meta.range().1), - Column::Float(c) => Scalar::Float(c.meta.range().1), - Column::Integer(c) => Scalar::Integer(c.meta.range().1), + Column::String(c) => { + if let Some(max) = c.meta.range().1 { + return Some(Scalar::String(max)); + } + None + } + Column::Float(c) => Some(Scalar::Float(c.meta.range().1)), + Column::Integer(c) => Some(Scalar::Integer(c.meta.range().1)), } } } @@ -126,16 +139,16 @@ pub struct String { impl String { pub fn add(&mut self, s: &str) { - self.meta.add(s); + self.meta.add(Some(s.to_string())); self.data.push(s); } - pub fn add_additional(&mut self, s: &str, additional: u64) { - self.meta.add(s); + pub fn add_additional(&mut self, s: Option, additional: u64) { + self.meta.add(s.clone()); self.data.push_additional(s, additional); } - pub fn column_range(&self) -> (&str, &str) { + pub fn column_range(&self) -> (Option<&std::string::String>, Option<&std::string::String>) { self.meta.range() } @@ -241,21 +254,21 @@ impl From<&[i64]> for Integer { pub mod metadata { #[derive(Debug, Default)] pub struct Str { - range: (String, String), + range: (Option, Option), num_rows: usize, // sparse_index: BTreeMap, } impl Str { - pub fn add(&mut self, s: &str) { + pub fn add(&mut self, s: Option) { self.num_rows += 1; - if self.range.0.as_str() > s { - self.range.0 = s.to_owned(); + if self.range.0 > s { + self.range.0 = s.clone(); } - if self.range.1.as_str() < s { - self.range.1 = s.to_owned(); + if self.range.1 < s { + self.range.1 = s; } } @@ -263,8 +276,8 @@ pub mod metadata { self.num_rows } - pub fn maybe_contains_value(&self, v: String) -> bool { - let res = self.range.0 <= v && v <= self.range.1; + pub fn maybe_contains_value(&self, v: &str) -> bool { + let res = self.range.0 <= Some(v.to_string()) && Some(v.to_string()) <= self.range.1; println!( "column with ({:?}) maybe contain {:?} -- {:?}", self.range, v, res @@ -272,12 +285,13 @@ pub mod metadata { res } - pub fn range(&self) -> (&str, &str) { - (&self.range.0, &self.range.1) + pub fn range(&self) -> (Option<&String>, Option<&String>) { + (self.range.0.as_ref(), self.range.1.as_ref()) } pub fn size(&self) -> usize { - self.range.0.len() + self.range.1.len() + std::mem::size_of::() + // TODO!!!! + 0 //self.range.0.len() + self.range.1.len() + std::mem::size_of::() } } diff --git a/delorean_mem_qe/src/encoding.rs b/delorean_mem_qe/src/encoding.rs index e06dac8b3d..283398223f 100644 --- a/delorean_mem_qe/src/encoding.rs +++ b/delorean_mem_qe/src/encoding.rs @@ -60,10 +60,10 @@ impl From<&[f64]> for PlainFixed { #[derive(Debug, Default)] pub struct DictionaryRLE { // stores the mapping between an entry and its assigned index. - entry_index: BTreeMap, + entry_index: BTreeMap, usize>, // stores the mapping between an index and its entry. - index_entry: BTreeMap, + index_entry: BTreeMap>, map_size: usize, // TODO(edd) this isn't perfect at all @@ -88,12 +88,16 @@ impl DictionaryRLE { } pub fn push(&mut self, v: &str) { - self.push_additional(v, 1); + self.push_additional(Some(v.to_owned()), 1); } - pub fn push_additional(&mut self, v: &str, additional: u64) { + pub fn push_none(&mut self) { + self.push_additional(None, 1); + } + + pub fn push_additional(&mut self, v: Option, additional: u64) { self.total += additional; - let idx = self.entry_index.get(v); + let idx = self.entry_index.get(&v); match idx { Some(idx) => { if let Some((last_idx, rl)) = self.run_lengths.last_mut() { @@ -112,9 +116,12 @@ impl DictionaryRLE { if idx.is_none() { let idx = self.entry_index.len(); - self.entry_index.insert(String::from(v), idx); - self.index_entry.insert(idx, String::from(v)); - self.map_size += v.len() + std::mem::size_of::(); + self.entry_index.insert(v.clone(), idx); + if let Some(value) = &v { + self.map_size += value.len(); + } + self.index_entry.insert(idx, v); + self.map_size += 8 + std::mem::size_of::(); // TODO(edd): clean this option size up self.run_lengths.push((idx, additional)); self.run_length_size += std::mem::size_of::<(usize, u64)>(); @@ -126,9 +133,9 @@ impl DictionaryRLE { // row_ids returns an iterator over the set of row ids matching the provided // value. - pub fn row_ids(&self, value: &str) -> impl iter::Iterator { + pub fn row_ids(&self, value: Option) -> impl iter::Iterator { let mut out: Vec = vec![]; - if let Some(idx) = self.entry_index.get(value) { + if let Some(idx) = self.entry_index.get(&value) { let mut index: usize = 0; for (other_idx, other_rl) in &self.run_lengths { let start = index; @@ -143,9 +150,9 @@ impl DictionaryRLE { // row_ids returns an iterator over the set of row ids matching the provided // value. - pub fn row_ids_roaring(&self, value: &str) -> croaring::Bitmap { + pub fn row_ids_roaring(&self, value: Option) -> croaring::Bitmap { let mut bm = croaring::Bitmap::create(); - if let Some(idx) = self.entry_index.get(value) { + if let Some(idx) = self.entry_index.get(&value) { let mut index: u64 = 0; for (other_idx, other_rl) in &self.run_lengths { let start = index; @@ -179,11 +186,11 @@ impl DictionaryRLE { // unreachable!("for now"); // } - pub fn dictionary(&self) -> BTreeSet { + pub fn dictionary(&self) -> BTreeSet> { self.entry_index .keys() .cloned() - .collect::>() + .collect::>>() } // get the logical value at the provided index, or None if there is no value @@ -193,7 +200,12 @@ impl DictionaryRLE { let mut total = 0; for (idx, rl) in &self.run_lengths { if total + rl > index as u64 { - return self.index_entry.get(idx); + // TODO(edd): Can this really be idiomatic??? + match self.index_entry.get(idx) { + Some(&Some(ref result)) => return Some(result), + Some(&None) => return None, + None => return None, + } } total += rl; } @@ -203,19 +215,20 @@ impl DictionaryRLE { // values materialises a vector of references to all logical values in the // encoding. - pub fn values(&mut self) -> Vec<&str> { - let mut out = Vec::with_capacity(self.total as usize); + pub fn values(&mut self) -> Vec> { + let mut out: Vec> = Vec::with_capacity(self.total as usize); // build reverse mapping. let mut idx_value = BTreeMap::new(); for (k, v) in &self.entry_index { - idx_value.insert(v, k.as_str()); + idx_value.insert(v, k); } assert_eq!(idx_value.len(), self.entry_index.len()); for (idx, rl) in &self.run_lengths { - let &v = idx_value.get(&idx).unwrap(); - out.extend(iter::repeat(&v).take(*rl as usize)); + // TODO(edd): fix unwrap - we know that the value exists in map... + let v = idx_value.get(&idx).unwrap().as_ref(); + out.extend(iter::repeat(v).take(*rl as usize)); } out } @@ -261,17 +274,34 @@ mod test { drle.push("world"); drle.push("hello"); drle.push("hello"); - drle.push_additional("hello", 1); + drle.push_additional(Some("hello".to_string()), 1); assert_eq!( drle.values(), - ["hello", "hello", "world", "hello", "hello", "hello",] + [ + Some(&"hello".to_string()), + Some(&"hello".to_string()), + Some(&"world".to_string()), + Some(&"hello".to_string()), + Some(&"hello".to_string()), + Some(&"hello".to_string()) + ] ); - drle.push_additional("zoo", 3); + drle.push_additional(Some("zoo".to_string()), 3); assert_eq!( drle.values(), - ["hello", "hello", "world", "hello", "hello", "hello", "zoo", "zoo", "zoo"] + [ + Some(&"hello".to_string()), + Some(&"hello".to_string()), + Some(&"world".to_string()), + Some(&"hello".to_string()), + Some(&"hello".to_string()), + Some(&"hello".to_string()), + Some(&"zoo".to_string()), + Some(&"zoo".to_string()), + Some(&"zoo".to_string()), + ] ); assert_eq!(drle.value(0).unwrap(), "hello"); @@ -288,34 +318,49 @@ mod test { #[test] fn row_ids() { let mut drle = super::DictionaryRLE::new(); - drle.push_additional("abc", 3); - drle.push_additional("dre", 2); + drle.push_additional(Some("abc".to_string()), 3); + drle.push_additional(Some("dre".to_string()), 2); drle.push("abc"); - let ids = drle.row_ids("abc").collect::>(); + let ids = drle + .row_ids(Some("abc".to_string())) + .collect::>(); assert_eq!(ids, vec![0, 1, 2, 5]); - let ids = drle.row_ids("dre").collect::>(); + let ids = drle + .row_ids(Some("dre".to_string())) + .collect::>(); assert_eq!(ids, vec![3, 4]); - let ids = drle.row_ids("foo").collect::>(); + let ids = drle + .row_ids(Some("foo".to_string())) + .collect::>(); assert_eq!(ids, vec![]); } #[test] fn row_ids_roaring() { let mut drle = super::DictionaryRLE::new(); - drle.push_additional("abc", 3); - drle.push_additional("dre", 2); + drle.push_additional(Some("abc".to_string()), 3); + drle.push_additional(Some("dre".to_string()), 2); drle.push("abc"); - let ids = drle.row_ids_roaring("abc").iter().collect::>(); + let ids = drle + .row_ids_roaring(Some("abc".to_string())) + .iter() + .collect::>(); assert_eq!(ids, vec![0, 1, 2, 5]); - let ids = drle.row_ids_roaring("dre").iter().collect::>(); + let ids = drle + .row_ids_roaring(Some("dre".to_string())) + .iter() + .collect::>(); assert_eq!(ids, vec![3, 4]); - let ids = drle.row_ids_roaring("foo").iter().collect::>(); + let ids = drle + .row_ids_roaring(Some("foo".to_string())) + .iter() + .collect::>(); assert_eq!(ids, vec![]); } } diff --git a/delorean_mem_qe/src/segment.rs b/delorean_mem_qe/src/segment.rs index 1f1084c0e8..4b4a58289a 100644 --- a/delorean_mem_qe/src/segment.rs +++ b/delorean_mem_qe/src/segment.rs @@ -87,6 +87,24 @@ impl Segment { } } +/// Meta data for a segment. This data is mainly used to determine if a segment +/// may contain value for answering a query. +#[derive(Debug, Default)] +pub struct SegmentMetaData { + size: usize, // TODO + rows: usize, + + column_names: Vec, + time_range: (i64, i64), + // TODO column sort order +} + +impl SegmentMetaData { + pub fn overlaps_time_range(&self, from: i64, to: i64) -> bool { + self.time_range.0 <= to && from <= self.time_range.1 + } +} + pub struct Segments<'a> { segments: Vec<&'a Segment>, } @@ -100,6 +118,7 @@ impl<'a> Segments<'a> { let mut segments: Vec<&Segment> = vec![]; for segment in &self.segments { if segment.meta.overlaps_time_range(min, max) { + println!("Segement {:?} overlaps", segment.meta); segments.push(segment); } } @@ -131,7 +150,7 @@ impl<'a> Segments<'a> { let mut min_min: Option = None; for segment in &self.segments { if let Some(i) = segment.column_names().iter().position(|c| c == column_name) { - let min = Some(segment.columns[i].min()); + let min = segment.columns[i].min(); if min_min.is_none() { min_min = min } else if min_min > min { @@ -152,7 +171,7 @@ impl<'a> Segments<'a> { let mut max_max: Option = None; for segment in &self.segments { if let Some(i) = segment.column_names().iter().position(|c| c == column_name) { - let max = Some(segment.columns[i].max()); + let max = segment.columns[i].max(); if max_max.is_none() { max_max = max } else if max_max < max { @@ -231,28 +250,5 @@ impl<'a> Segments<'a> { } } -/// Meta data for a segment. This data is mainly used to determine if a segment -/// may contain value for answering a query. -#[derive(Debug, Default)] -pub struct SegmentMetaData { - size: usize, // TODO - rows: usize, - - column_names: Vec, - time_range: (i64, i64), - // TODO column sort order -} - -impl SegmentMetaData { - pub fn overlaps_time_range(&self, from: i64, to: i64) -> bool { - let result = self.time_range.0 <= to && from <= self.time_range.1; - println!( - "segment with ({:?}) overlaps ({:?}, {:?}) -- {:?}", - self.time_range, from, to, result - ); - result - } -} - #[cfg(test)] mod test {}