From 0cf445991efe88c6a2a659368cc5c587f9d81504 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 13 May 2021 10:42:58 +0100 Subject: [PATCH] refactor: all read buffer tests passing --- read_buffer/src/column.rs | 7 +- read_buffer/src/column/encoding/scalar/rle.rs | 373 ++++++++++++++++-- read_buffer/src/column/float.rs | 6 +- 3 files changed, 354 insertions(+), 32 deletions(-) diff --git a/read_buffer/src/column.rs b/read_buffer/src/column.rs index 2a78bc0309..6109ccc19b 100644 --- a/read_buffer/src/column.rs +++ b/read_buffer/src/column.rs @@ -15,7 +15,7 @@ use arrow::array::Array; use crate::schema::LogicalDataType; use crate::value::{EncodedValues, OwnedValue, Scalar, Value, Values}; use boolean::BooleanEncoding; -use encoding::{bool, scalar}; +use encoding::bool; use float::FloatEncoding; use integer::IntegerEncoding; use string::StringEncoding; @@ -1088,13 +1088,13 @@ impl From for Column { _ => unreachable!("min/max must both be Some or None"), }; - let data = scalar::FixedNull::::from(arr); + let data = FloatEncoding::from(arr); let meta = MetaData { range, ..MetaData::default() }; - Self::Float(meta, FloatEncoding::FixedNull64(data)) + Self::Float(meta, data) } } @@ -1285,6 +1285,7 @@ impl RowIDs { } } + // Add all row IDs in the domain `[from, to)` to the collection. pub fn add_range(&mut self, from: u32, to: u32) { match self { Self::Bitmap(ids) => ids.add_range(from as u64..to as u64), diff --git a/read_buffer/src/column/encoding/scalar/rle.rs b/read_buffer/src/column/encoding/scalar/rle.rs index bcc648b16a..86e3ca4067 100644 --- a/read_buffer/src/column/encoding/scalar/rle.rs +++ b/read_buffer/src/column/encoding/scalar/rle.rs @@ -1,3 +1,8 @@ +use arrow::{ + array::{Array, PrimitiveArray}, + datatypes::Float64Type, +}; + use crate::column::cmp; use crate::column::RowIDs; use std::{ @@ -29,6 +34,13 @@ where // when `T` is a `NonZeroX` then we get the niche optimisation // that makes `Option` the same size as `T`. // + // For floats I plan to implement a check that will allow us to + // store them as integers if they're natural numbers, then we can + // make use of `Option` with no option overhead. If + // they're not all natural numbers then I think we could make a + // `NonZeroNaN` and perhaps figure out how to get `Option` + // to have no overhead. + // // TODO(edd): another obvious thing that might be worth doing in the future // is to store all the run-lengths contiguously rather than in // `Vec<(,)>`. One advantage of this is that it is possible to @@ -70,7 +82,9 @@ impl RLE { /// A reasonable estimation of the on-heap size this encoding takes up. pub fn size(&self) -> usize { - todo!() + size_of::)>>() // run length container size + + (self.run_lengths.len() * size_of::<(u32, Option)>()) // run lengths + + size_of::()+size_of::() // null count, num rows } /// The estimated total size in bytes of the underlying values in the @@ -106,13 +120,6 @@ impl RLE { self.null_count() > 0 } - /// Determines if the column contains a non-null value. - /// - /// TODO: remove this and just use contains_null(). - pub fn has_any_non_null_value(&self) -> bool { - !self.contains_null() - } - // // // ---- Helper methods for constructing an instance of the encoding @@ -182,8 +189,13 @@ impl RLE { /// Populates the provided destination container with the row ids satisfying /// the provided predicate. - pub fn row_ids_filter(&self, _value: T, _op: &cmp::Operator, _dst: RowIDs) -> RowIDs { - todo!() + pub fn row_ids_filter(&self, value: T, op: &cmp::Operator, dst: RowIDs) -> RowIDs { + match op { + cmp::Operator::Equal | cmp::Operator::NotEqual => self.row_ids_equal(value, op, dst), + cmp::Operator::LT | cmp::Operator::LTE | cmp::Operator::GT | cmp::Operator::GTE => { + self.row_ids_cmp(value, op, dst) + } + } } /// Returns the set of row ids that satisfy a pair of binary operators @@ -218,14 +230,91 @@ impl RLE { } } - // Finds row ids based on = or != operator. - fn row_ids_equal(&self, _value: T, _op: &cmp::Operator, mut _dst: RowIDs) -> RowIDs { - todo!() + // Finds row ids for non-null values, based on = or != operator. + // + // TODO(edd): predicate filtering could be improved here because we need to + // effectively check all the run-lengths to find any that match + // the predicate. Performance will be proportional to the number + // of run-lengths in the column. For scalar columns where + // predicates are less common for out use-cases this is probably + // OK for a while. Improvements: (1) we could add bitsets of + // pre-computed rows for each distinct value, which is what we + // do for string RLE where equality predicates are very common, + // or (2) we could add a sparse index to allow us to jump to the + // run-lengths that contain matching logical row_ids. + fn row_ids_equal(&self, value: T, op: &cmp::Operator, mut dst: RowIDs) -> RowIDs { + dst.clear(); + + if self.num_rows() == 0 { + return dst; + } + + match op { + cmp::Operator::Equal => { + let mut curr_logical_row_id = 0; + for (rl, next) in &self.run_lengths { + if let Some(next) = next { + if matches!(next.partial_cmp(&value), Some(Ordering::Equal)) { + dst.add_range(curr_logical_row_id, curr_logical_row_id + rl); + } + } + curr_logical_row_id += rl; + } + } + cmp::Operator::NotEqual => { + let mut curr_logical_row_id = 0; + for (rl, next) in &self.run_lengths { + if let Some(next) = next { + if !matches!(next.partial_cmp(&value), Some(Ordering::Equal)) { + dst.add_range(curr_logical_row_id, curr_logical_row_id + rl); + } + } + curr_logical_row_id += rl; + } + } + _ => unreachable!("invalid operator"), + } + + dst } - // Finds row ids based on <, <=, > or >= operator. - fn row_ids_cmp(&self, _value: T, _op: &cmp::Operator, mut _dst: RowIDs) -> RowIDs { - todo!() + // Finds row ids for non-null values based on <, <=, > or >= operator. + // TODO(edd): predicate filtering could be improved here because we need to + // effectively check all the run-lengths to find any that match + // the predicate. Performance will be proportional to the number + // of run-lengths in the column. For scalar columns where + // predicates are less common for out use-cases this is probably + // OK for a while. Improvements: (1) we could add bitsets of + // pre-computed rows for each distinct value, which is what we + // do for string RLE where predicates are very common. + fn row_ids_cmp(&self, value: T, op: &cmp::Operator, mut dst: RowIDs) -> RowIDs { + dst.clear(); + + if self.num_rows() == 0 { + return dst; + } + + // Given an operator `cmp` will be set to a predicate function for the + // same operator + let cmp = match op { + cmp::Operator::GT => PartialOrd::gt, + cmp::Operator::GTE => PartialOrd::ge, + cmp::Operator::LT => PartialOrd::lt, + cmp::Operator::LTE => PartialOrd::le, + op => unreachable!("{:?} is an invalid operator", op), + }; + + let mut curr_logical_row_id = 0; + for (rl, next) in &self.run_lengths { + if let Some(next) = next { + if cmp(next, &value) { + dst.add_range(curr_logical_row_id, curr_logical_row_id + rl); + } + } + curr_logical_row_id += rl; + } + + dst } // Special case function for finding all rows that satisfy two operators on @@ -351,8 +440,9 @@ impl RLE { /// a logical value. /// pub fn values(&self, row_ids: &[u32], mut dst: Vec>) -> Vec> { + assert!(!row_ids.is_empty(), "no row IDs provided"); assert!( - row_ids.len() < self.num_rows() as usize, + row_ids.len() <= self.num_rows() as usize, "more row_ids {:?} than rows {:?}", row_ids.len(), self.num_rows() @@ -411,12 +501,57 @@ impl RLE { dst } - /// Returns true if a non-null value exists at any of the row ids. - /// - /// TODO(edd): this needs implementing when we push down NULL predicate - /// support. - pub fn has_non_null_value(&self, _row_ids: &[u32]) -> bool { - todo!() + // /// Returns true if a non-null value exists at any of the row ids. + pub fn has_non_null_value(&self, row_ids: &[u32]) -> bool { + assert!(!row_ids.is_empty(), "no row IDs provided"); + // Ensure row ids ordered + debug_assert!(self.check_row_ids_ordered(row_ids)); + + assert!( + row_ids.len() <= self.num_rows() as usize, + "more row_ids {:?} than rows {:?}", + row_ids.len(), + self.num_rows() + ); + + let mut curr_logical_row_id = 0; + let (mut curr_entry_rl, mut curr_value) = self.run_lengths[0]; + + let mut i = 1; + for &row_id in row_ids { + assert!( + row_id < self.num_rows(), + "row_id {:?} beyond max row {:?}", + row_id, + self.num_rows() - 1 + ); + + // find correctly logical row for `row_id`. + while curr_logical_row_id + curr_entry_rl <= row_id { + // this encoded entry does not cover the row we need. + // move on to next entry + curr_logical_row_id += curr_entry_rl; + curr_entry_rl = self.run_lengths[i].0; + curr_value = self.run_lengths[i].1; + + i += 1; + } + + if curr_value.is_some() { + return true; + } + + curr_logical_row_id += 1; // move forwards a logical row + curr_entry_rl -= 1; + } + + // all provide row IDs contain NULL values + false + } + + /// Determines if the column contains a non-null value. + pub fn has_any_non_null_value(&self) -> bool { + self.null_count() < self.num_rows() } // @@ -547,8 +682,40 @@ where } } +// Build an RLE encoded column for a given Arrow Array. +impl From> for RLE { + fn from(arr: PrimitiveArray) -> Self { + if arr.is_empty() { + return Self::default(); + } else if arr.len() == 1 { + let mut enc = Self::default(); + enc.push_additional((!arr.is_null(0)).then(|| arr.value(0)), 1); + return enc; + } + + let mut enc = Self::default(); + let (mut rl, mut v) = (1, (!arr.is_null(0)).then(|| arr.value(0))); + for next in arr.iter().skip(1) { + if next == v { + rl += 1; + continue; + } + + enc.push_additional(v, rl); + rl = 1; + v = next; + } + + // push the final run length + enc.push_additional(v, rl); + enc + } +} + #[cfg(test)] mod test { + use cmp::Operator; + use super::*; #[test] @@ -654,7 +821,20 @@ mod test { } #[test] - fn size() {} + fn size() { + let mut enc: RLE = RLE::default(); + + // 24b container + (0 rl * 24) + 4 + 4 = 32 + assert_eq!(enc.size(), 32); + + enc.push_none(); + // 24b container + (1 rl * 24) + 4 + 4 = 32 + assert_eq!(enc.size(), 56); + + enc.push_additional_some(1, 10); + // 24b container + (2 rl * 24) + 4 + 4 = 32 + assert_eq!(enc.size(), 80); + } #[test] fn value() { @@ -746,13 +926,152 @@ mod test { } #[test] - fn row_ids_filter_eq() {} + fn has_non_null_value() { + let mut enc: RLE = RLE::default(); + enc.push_none(); + assert!(!enc.has_non_null_value(&[0])); + + enc.push_additional(Some(45.2), 3); + assert!(!enc.has_non_null_value(&[0])); + assert!(enc.has_non_null_value(&[0, 1])); + assert!(enc.has_non_null_value(&[2, 3])); + assert!(enc.has_non_null_value(&[0, 1, 2, 3])); + + enc.push_additional(None, 3); + assert!(!enc.has_non_null_value(&[0, 5])); + assert!(!enc.has_non_null_value(&[6])); + + // NULL, 45.2, 45.2, 45.2, NULL, NULL, NULL, 19.3 + enc.push(19.3); + assert!(enc.has_non_null_value(&[3, 7])); + assert!(enc.has_non_null_value(&[7])); + } #[test] - fn row_ids_filter_neq() {} + fn has_any_non_null_value() { + let mut enc: RLE = RLE::default(); + assert!(!enc.has_any_non_null_value()); + + enc.push_none(); + assert!(!enc.has_any_non_null_value()); + + enc.push_additional(None, 3); + assert!(!enc.has_any_non_null_value()); + + enc.push(22.34); + assert!(enc.has_any_non_null_value()); + + enc.push_additional(None, 3); + assert!(enc.has_any_non_null_value()); + } #[test] - fn row_ids_filter_lt() {} + fn row_ids_filter_eq() { + // NULL, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, NULL, 19, 2, 2 + let mut enc: RLE = RLE::default(); + enc.push_none(); + enc.push_additional_some(2, 10); + enc.push_none(); + enc.push_additional_some(19, 1); + enc.push_additional_some(2, 2); + + let dst = enc.row_ids_filter(22, &Operator::Equal, RowIDs::new_vector()); + assert!(dst.unwrap_vector().is_empty()); + + let dst = enc.row_ids_filter(2, &Operator::Equal, RowIDs::new_vector()); + assert_eq!( + dst.unwrap_vector(), + &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13, 14] + ); + + let dst = enc.row_ids_filter(19, &Operator::Equal, RowIDs::new_vector()); + assert_eq!(dst.unwrap_vector(), &vec![12]); + + // Test all null encoding + let mut enc: RLE = RLE::default(); + enc.push_additional_none(10); + let dst = enc.row_ids_filter(2, &Operator::Equal, RowIDs::new_vector()); + assert!(dst.unwrap_vector().is_empty()); + } + + #[test] + fn row_ids_filter_neq() { + // NULL, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, NULL, 19.0, 2.000000001 + let mut enc: RLE = RLE::default(); + enc.push_none(); + enc.push_additional_some(2.0, 10); + enc.push_none(); + enc.push_additional_some(19.0, 1); + enc.push_additional_some(2.000000001, 1); + + let dst = enc.row_ids_filter(2.0, &Operator::NotEqual, RowIDs::new_vector()); + assert_eq!(dst.unwrap_vector(), &vec![12, 13]); + + let dst = enc.row_ids_filter(2.000000001, &Operator::NotEqual, RowIDs::new_vector()); + assert_eq!( + dst.unwrap_vector(), + &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12] + ); + + // Test all null encoding + let mut enc: RLE = RLE::default(); + enc.push_additional_none(10); + let dst = enc.row_ids_filter(2, &Operator::NotEqual, RowIDs::new_vector()); + assert!(dst.unwrap_vector().is_empty()); // NULL values do not match != + + // Test NaN behaviour + let mut enc: RLE = RLE::default(); + enc.push_additional_none(3); + enc.push(22.3); + enc.push_additional_some(f64::NAN, 1); + enc.push_additional_some(f64::NEG_INFINITY, 1); + enc.push_additional_some(f64::INFINITY, 1); + + // All non-null rows match !- 2.0 including the NaN / +- ∞ + let dst = enc.row_ids_filter(2.0, &Operator::NotEqual, RowIDs::new_vector()); + assert_eq!(dst.unwrap_vector(), &vec![3, 4, 5, 6]); + } + + #[test] + fn row_ids_filter_lt() { + // NULL, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, NULL, 19.0, 2.000000001 + let mut enc: RLE = RLE::default(); + enc.push_none(); + enc.push_additional_some(2.0, 10); + enc.push_none(); + enc.push_additional_some(19.0, 1); + enc.push_additional_some(2.000000001, 1); + + let dst = enc.row_ids_filter(2.0, &Operator::LT, RowIDs::new_vector()); + assert!(dst.unwrap_vector().is_empty()); + + let dst = enc.row_ids_filter(2.0000000000001, &Operator::LT, RowIDs::new_vector()); + assert_eq!(dst.unwrap_vector(), &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + + let dst = enc.row_ids_filter(2.2, &Operator::LT, RowIDs::new_vector()); + assert_eq!( + dst.unwrap_vector(), + &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] + ); + + // // Test all null encoding + // let mut enc: RLE = RLE::default(); + // enc.push_additional_none(10); + // let dst = enc.row_ids_filter(2, &Operator::LT, RowIDs::new_vector()); + // assert!(dst.unwrap_vector().is_empty()); // NULL values do not match != + + // // Test NaN behaviour + // let mut enc: RLE = RLE::default(); + // enc.push_additional_none(3); + // enc.push(22.3); + // enc.push_additional_some(f64::NAN, 1); + // enc.push_additional_some(f64::NEG_INFINITY, 1); + // enc.push_additional_some(f64::INFINITY, 1); + + // // All non-null rows match !- 2.0 including the NaN / +- ∞ + // let dst = enc.row_ids_filter(2.0, &Operator::LT, RowIDs::new_vector()); + // assert_eq!(dst.unwrap_vector(), &vec![3, 4, 5, 6]); + } #[test] fn row_ids_filter_lte() {} diff --git a/read_buffer/src/column/float.rs b/read_buffer/src/column/float.rs index c9b05a93e9..f3e67f30b8 100644 --- a/read_buffer/src/column/float.rs +++ b/read_buffer/src/column/float.rs @@ -11,6 +11,7 @@ use super::{cmp, Statistics}; use crate::column::{RowIDs, Scalar, Value, Values}; #[allow(clippy::upper_case_acronyms)] // TODO(edd): these will be OK in 1.52 +#[derive(Debug)] pub enum FloatEncoding { Fixed64(Fixed), FixedNull64(FixedNull), @@ -320,8 +321,9 @@ impl From for FloatEncoding { } // TODO(edd) Right now let's just RLE encode the column if it is 50% NULL. - if arr.null_count() >= arr.len() / 2 { - return Self::RLE64(RLE::from(arr.values())); + // and has at least 100 values in it. + if arr.len() >= 100 && arr.null_count() >= arr.len() / 2 { + return Self::RLE64(RLE::from(arr)); } Self::FixedNull64(FixedNull::::from(arr))