refactor: all read buffer tests passing

pull/24376/head
Edd Robinson 2021-05-13 10:42:58 +01:00
parent 7525f6e9e3
commit 0cf445991e
3 changed files with 354 additions and 32 deletions

View File

@ -15,7 +15,7 @@ use arrow::array::Array;
use crate::schema::LogicalDataType;
use crate::value::{EncodedValues, OwnedValue, Scalar, Value, Values};
use boolean::BooleanEncoding;
use encoding::{bool, scalar};
use encoding::bool;
use float::FloatEncoding;
use integer::IntegerEncoding;
use string::StringEncoding;
@ -1088,13 +1088,13 @@ impl From<arrow::array::Float64Array> for Column {
_ => unreachable!("min/max must both be Some or None"),
};
let data = scalar::FixedNull::<arrow::datatypes::Float64Type>::from(arr);
let data = FloatEncoding::from(arr);
let meta = MetaData {
range,
..MetaData::default()
};
Self::Float(meta, FloatEncoding::FixedNull64(data))
Self::Float(meta, data)
}
}
@ -1285,6 +1285,7 @@ impl RowIDs {
}
}
// Add all row IDs in the domain `[from, to)` to the collection.
pub fn add_range(&mut self, from: u32, to: u32) {
match self {
Self::Bitmap(ids) => ids.add_range(from as u64..to as u64),

View File

@ -1,3 +1,8 @@
use arrow::{
array::{Array, PrimitiveArray},
datatypes::Float64Type,
};
use crate::column::cmp;
use crate::column::RowIDs;
use std::{
@ -29,6 +34,13 @@ where
// when `T` is a `NonZeroX` then we get the niche optimisation
// that makes `Option<T>` the same size as `T`.
//
// For floats I plan to implement a check that will allow us to
// store them as integers if they're natural numbers, then we can
// make use of `Option<NonZeroX>` with no option overhead. If
// they're not all natural numbers then I think we could make a
// `NonZeroNaN` and perhaps figure out how to get `Option<NonZeroNan>`
// to have no overhead.
//
// TODO(edd): another obvious thing that might be worth doing in the future
// is to store all the run-lengths contiguously rather than in
// `Vec<(,)>`. One advantage of this is that it is possible to
@ -70,7 +82,9 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
/// A reasonable estimation of the on-heap size this encoding takes up.
pub fn size(&self) -> usize {
todo!()
size_of::<Vec<(u32, Option<T>)>>() // run length container size
+ (self.run_lengths.len() * size_of::<(u32, Option<T>)>()) // run lengths
+ size_of::<u32>()+size_of::<u32>() // null count, num rows
}
/// The estimated total size in bytes of the underlying values in the
@ -106,13 +120,6 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
self.null_count() > 0
}
/// Determines if the column contains a non-null value.
///
/// TODO: remove this and just use contains_null().
pub fn has_any_non_null_value(&self) -> bool {
!self.contains_null()
}
//
//
// ---- Helper methods for constructing an instance of the encoding
@ -182,8 +189,13 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
/// Populates the provided destination container with the row ids satisfying
/// the provided predicate.
pub fn row_ids_filter(&self, _value: T, _op: &cmp::Operator, _dst: RowIDs) -> RowIDs {
todo!()
pub fn row_ids_filter(&self, value: T, op: &cmp::Operator, dst: RowIDs) -> RowIDs {
match op {
cmp::Operator::Equal | cmp::Operator::NotEqual => self.row_ids_equal(value, op, dst),
cmp::Operator::LT | cmp::Operator::LTE | cmp::Operator::GT | cmp::Operator::GTE => {
self.row_ids_cmp(value, op, dst)
}
}
}
/// Returns the set of row ids that satisfy a pair of binary operators
@ -218,14 +230,91 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
}
}
// Finds row ids based on = or != operator.
fn row_ids_equal(&self, _value: T, _op: &cmp::Operator, mut _dst: RowIDs) -> RowIDs {
todo!()
// Finds row ids for non-null values, based on = or != operator.
//
// TODO(edd): predicate filtering could be improved here because we need to
// effectively check all the run-lengths to find any that match
// the predicate. Performance will be proportional to the number
// of run-lengths in the column. For scalar columns where
// predicates are less common for out use-cases this is probably
// OK for a while. Improvements: (1) we could add bitsets of
// pre-computed rows for each distinct value, which is what we
// do for string RLE where equality predicates are very common,
// or (2) we could add a sparse index to allow us to jump to the
// run-lengths that contain matching logical row_ids.
fn row_ids_equal(&self, value: T, op: &cmp::Operator, mut dst: RowIDs) -> RowIDs {
dst.clear();
if self.num_rows() == 0 {
return dst;
}
// Finds row ids based on <, <=, > or >= operator.
fn row_ids_cmp(&self, _value: T, _op: &cmp::Operator, mut _dst: RowIDs) -> RowIDs {
todo!()
match op {
cmp::Operator::Equal => {
let mut curr_logical_row_id = 0;
for (rl, next) in &self.run_lengths {
if let Some(next) = next {
if matches!(next.partial_cmp(&value), Some(Ordering::Equal)) {
dst.add_range(curr_logical_row_id, curr_logical_row_id + rl);
}
}
curr_logical_row_id += rl;
}
}
cmp::Operator::NotEqual => {
let mut curr_logical_row_id = 0;
for (rl, next) in &self.run_lengths {
if let Some(next) = next {
if !matches!(next.partial_cmp(&value), Some(Ordering::Equal)) {
dst.add_range(curr_logical_row_id, curr_logical_row_id + rl);
}
}
curr_logical_row_id += rl;
}
}
_ => unreachable!("invalid operator"),
}
dst
}
// Finds row ids for non-null values based on <, <=, > or >= operator.
// TODO(edd): predicate filtering could be improved here because we need to
// effectively check all the run-lengths to find any that match
// the predicate. Performance will be proportional to the number
// of run-lengths in the column. For scalar columns where
// predicates are less common for out use-cases this is probably
// OK for a while. Improvements: (1) we could add bitsets of
// pre-computed rows for each distinct value, which is what we
// do for string RLE where predicates are very common.
fn row_ids_cmp(&self, value: T, op: &cmp::Operator, mut dst: RowIDs) -> RowIDs {
dst.clear();
if self.num_rows() == 0 {
return dst;
}
// Given an operator `cmp` will be set to a predicate function for the
// same operator
let cmp = match op {
cmp::Operator::GT => PartialOrd::gt,
cmp::Operator::GTE => PartialOrd::ge,
cmp::Operator::LT => PartialOrd::lt,
cmp::Operator::LTE => PartialOrd::le,
op => unreachable!("{:?} is an invalid operator", op),
};
let mut curr_logical_row_id = 0;
for (rl, next) in &self.run_lengths {
if let Some(next) = next {
if cmp(next, &value) {
dst.add_range(curr_logical_row_id, curr_logical_row_id + rl);
}
}
curr_logical_row_id += rl;
}
dst
}
// Special case function for finding all rows that satisfy two operators on
@ -351,8 +440,9 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
/// a logical value.
///
pub fn values(&self, row_ids: &[u32], mut dst: Vec<Option<T>>) -> Vec<Option<T>> {
assert!(!row_ids.is_empty(), "no row IDs provided");
assert!(
row_ids.len() < self.num_rows() as usize,
row_ids.len() <= self.num_rows() as usize,
"more row_ids {:?} than rows {:?}",
row_ids.len(),
self.num_rows()
@ -411,12 +501,57 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
dst
}
/// Returns true if a non-null value exists at any of the row ids.
///
/// TODO(edd): this needs implementing when we push down NULL predicate
/// support.
pub fn has_non_null_value(&self, _row_ids: &[u32]) -> bool {
todo!()
// /// Returns true if a non-null value exists at any of the row ids.
pub fn has_non_null_value(&self, row_ids: &[u32]) -> bool {
assert!(!row_ids.is_empty(), "no row IDs provided");
// Ensure row ids ordered
debug_assert!(self.check_row_ids_ordered(row_ids));
assert!(
row_ids.len() <= self.num_rows() as usize,
"more row_ids {:?} than rows {:?}",
row_ids.len(),
self.num_rows()
);
let mut curr_logical_row_id = 0;
let (mut curr_entry_rl, mut curr_value) = self.run_lengths[0];
let mut i = 1;
for &row_id in row_ids {
assert!(
row_id < self.num_rows(),
"row_id {:?} beyond max row {:?}",
row_id,
self.num_rows() - 1
);
// find correctly logical row for `row_id`.
while curr_logical_row_id + curr_entry_rl <= row_id {
// this encoded entry does not cover the row we need.
// move on to next entry
curr_logical_row_id += curr_entry_rl;
curr_entry_rl = self.run_lengths[i].0;
curr_value = self.run_lengths[i].1;
i += 1;
}
if curr_value.is_some() {
return true;
}
curr_logical_row_id += 1; // move forwards a logical row
curr_entry_rl -= 1;
}
// all provide row IDs contain NULL values
false
}
/// Determines if the column contains a non-null value.
pub fn has_any_non_null_value(&self) -> bool {
self.null_count() < self.num_rows()
}
//
@ -547,8 +682,40 @@ where
}
}
// Build an RLE encoded column for a given Arrow Array.
impl From<PrimitiveArray<Float64Type>> for RLE<f64> {
fn from(arr: PrimitiveArray<Float64Type>) -> Self {
if arr.is_empty() {
return Self::default();
} else if arr.len() == 1 {
let mut enc = Self::default();
enc.push_additional((!arr.is_null(0)).then(|| arr.value(0)), 1);
return enc;
}
let mut enc = Self::default();
let (mut rl, mut v) = (1, (!arr.is_null(0)).then(|| arr.value(0)));
for next in arr.iter().skip(1) {
if next == v {
rl += 1;
continue;
}
enc.push_additional(v, rl);
rl = 1;
v = next;
}
// push the final run length
enc.push_additional(v, rl);
enc
}
}
#[cfg(test)]
mod test {
use cmp::Operator;
use super::*;
#[test]
@ -654,7 +821,20 @@ mod test {
}
#[test]
fn size() {}
fn size() {
let mut enc: RLE<i64> = RLE::default();
// 24b container + (0 rl * 24) + 4 + 4 = 32
assert_eq!(enc.size(), 32);
enc.push_none();
// 24b container + (1 rl * 24) + 4 + 4 = 32
assert_eq!(enc.size(), 56);
enc.push_additional_some(1, 10);
// 24b container + (2 rl * 24) + 4 + 4 = 32
assert_eq!(enc.size(), 80);
}
#[test]
fn value() {
@ -746,13 +926,152 @@ mod test {
}
#[test]
fn row_ids_filter_eq() {}
fn has_non_null_value() {
let mut enc: RLE<f64> = RLE::default();
enc.push_none();
assert!(!enc.has_non_null_value(&[0]));
enc.push_additional(Some(45.2), 3);
assert!(!enc.has_non_null_value(&[0]));
assert!(enc.has_non_null_value(&[0, 1]));
assert!(enc.has_non_null_value(&[2, 3]));
assert!(enc.has_non_null_value(&[0, 1, 2, 3]));
enc.push_additional(None, 3);
assert!(!enc.has_non_null_value(&[0, 5]));
assert!(!enc.has_non_null_value(&[6]));
// NULL, 45.2, 45.2, 45.2, NULL, NULL, NULL, 19.3
enc.push(19.3);
assert!(enc.has_non_null_value(&[3, 7]));
assert!(enc.has_non_null_value(&[7]));
}
#[test]
fn row_ids_filter_neq() {}
fn has_any_non_null_value() {
let mut enc: RLE<f64> = RLE::default();
assert!(!enc.has_any_non_null_value());
enc.push_none();
assert!(!enc.has_any_non_null_value());
enc.push_additional(None, 3);
assert!(!enc.has_any_non_null_value());
enc.push(22.34);
assert!(enc.has_any_non_null_value());
enc.push_additional(None, 3);
assert!(enc.has_any_non_null_value());
}
#[test]
fn row_ids_filter_lt() {}
fn row_ids_filter_eq() {
// NULL, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, NULL, 19, 2, 2
let mut enc: RLE<u8> = RLE::default();
enc.push_none();
enc.push_additional_some(2, 10);
enc.push_none();
enc.push_additional_some(19, 1);
enc.push_additional_some(2, 2);
let dst = enc.row_ids_filter(22, &Operator::Equal, RowIDs::new_vector());
assert!(dst.unwrap_vector().is_empty());
let dst = enc.row_ids_filter(2, &Operator::Equal, RowIDs::new_vector());
assert_eq!(
dst.unwrap_vector(),
&vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13, 14]
);
let dst = enc.row_ids_filter(19, &Operator::Equal, RowIDs::new_vector());
assert_eq!(dst.unwrap_vector(), &vec![12]);
// Test all null encoding
let mut enc: RLE<u8> = RLE::default();
enc.push_additional_none(10);
let dst = enc.row_ids_filter(2, &Operator::Equal, RowIDs::new_vector());
assert!(dst.unwrap_vector().is_empty());
}
#[test]
fn row_ids_filter_neq() {
// NULL, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, NULL, 19.0, 2.000000001
let mut enc: RLE<f64> = RLE::default();
enc.push_none();
enc.push_additional_some(2.0, 10);
enc.push_none();
enc.push_additional_some(19.0, 1);
enc.push_additional_some(2.000000001, 1);
let dst = enc.row_ids_filter(2.0, &Operator::NotEqual, RowIDs::new_vector());
assert_eq!(dst.unwrap_vector(), &vec![12, 13]);
let dst = enc.row_ids_filter(2.000000001, &Operator::NotEqual, RowIDs::new_vector());
assert_eq!(
dst.unwrap_vector(),
&vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12]
);
// Test all null encoding
let mut enc: RLE<i8> = RLE::default();
enc.push_additional_none(10);
let dst = enc.row_ids_filter(2, &Operator::NotEqual, RowIDs::new_vector());
assert!(dst.unwrap_vector().is_empty()); // NULL values do not match !=
// Test NaN behaviour
let mut enc: RLE<f64> = RLE::default();
enc.push_additional_none(3);
enc.push(22.3);
enc.push_additional_some(f64::NAN, 1);
enc.push_additional_some(f64::NEG_INFINITY, 1);
enc.push_additional_some(f64::INFINITY, 1);
// All non-null rows match !- 2.0 including the NaN / +- ∞
let dst = enc.row_ids_filter(2.0, &Operator::NotEqual, RowIDs::new_vector());
assert_eq!(dst.unwrap_vector(), &vec![3, 4, 5, 6]);
}
#[test]
fn row_ids_filter_lt() {
// NULL, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, NULL, 19.0, 2.000000001
let mut enc: RLE<f64> = RLE::default();
enc.push_none();
enc.push_additional_some(2.0, 10);
enc.push_none();
enc.push_additional_some(19.0, 1);
enc.push_additional_some(2.000000001, 1);
let dst = enc.row_ids_filter(2.0, &Operator::LT, RowIDs::new_vector());
assert!(dst.unwrap_vector().is_empty());
let dst = enc.row_ids_filter(2.0000000000001, &Operator::LT, RowIDs::new_vector());
assert_eq!(dst.unwrap_vector(), &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let dst = enc.row_ids_filter(2.2, &Operator::LT, RowIDs::new_vector());
assert_eq!(
dst.unwrap_vector(),
&vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
);
// // Test all null encoding
// let mut enc: RLE<i8> = RLE::default();
// enc.push_additional_none(10);
// let dst = enc.row_ids_filter(2, &Operator::LT, RowIDs::new_vector());
// assert!(dst.unwrap_vector().is_empty()); // NULL values do not match !=
// // Test NaN behaviour
// let mut enc: RLE<f64> = RLE::default();
// enc.push_additional_none(3);
// enc.push(22.3);
// enc.push_additional_some(f64::NAN, 1);
// enc.push_additional_some(f64::NEG_INFINITY, 1);
// enc.push_additional_some(f64::INFINITY, 1);
// // All non-null rows match !- 2.0 including the NaN / +- ∞
// let dst = enc.row_ids_filter(2.0, &Operator::LT, RowIDs::new_vector());
// assert_eq!(dst.unwrap_vector(), &vec![3, 4, 5, 6]);
}
#[test]
fn row_ids_filter_lte() {}

View File

@ -11,6 +11,7 @@ use super::{cmp, Statistics};
use crate::column::{RowIDs, Scalar, Value, Values};
#[allow(clippy::upper_case_acronyms)] // TODO(edd): these will be OK in 1.52
#[derive(Debug)]
pub enum FloatEncoding {
Fixed64(Fixed<f64>),
FixedNull64(FixedNull<arrow::datatypes::Float64Type>),
@ -320,8 +321,9 @@ impl From<arrow::array::Float64Array> for FloatEncoding {
}
// TODO(edd) Right now let's just RLE encode the column if it is 50% NULL.
if arr.null_count() >= arr.len() / 2 {
return Self::RLE64(RLE::from(arr.values()));
// and has at least 100 values in it.
if arr.len() >= 100 && arr.null_count() >= arr.len() / 2 {
return Self::RLE64(RLE::from(arr));
}
Self::FixedNull64(FixedNull::<arrow::datatypes::Float64Type>::from(arr))