feat: implement predicate pushdown on RLE

pull/24376/head
Edd Robinson 2021-05-13 17:14:41 +01:00
parent 0cf445991e
commit 1ac949e7ea
3 changed files with 256 additions and 155 deletions

View File

@ -1,4 +1,4 @@
use std::convert::TryFrom;
use std::{convert::TryFrom, fmt::Display};
/// Possible comparison operators
#[derive(Debug, PartialEq, Copy, Clone)]
@ -12,6 +12,23 @@ pub enum Operator {
LTE,
}
impl Display for Operator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
Self::Equal => "=",
Self::NotEqual => "!=",
Self::GT => ">",
Self::GTE => ">=",
Self::LT => "<",
Self::LTE => "<=",
}
)
}
}
impl TryFrom<&str> for Operator {
type Error = String;

View File

@ -93,7 +93,7 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
pub fn size_raw(&self, include_nulls: bool) -> usize {
let base_size = size_of::<Self>();
if include_nulls {
return base_size + (self.num_rows() as usize * size_of::<T>());
return base_size + (self.num_rows() as usize * size_of::<Option<T>>());
}
// remove NULL values from calculation
@ -101,7 +101,7 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
+ self
.run_lengths
.iter()
.filter_map(|(rl, v)| v.is_some().then(|| *rl as usize * size_of::<T>()))
.filter_map(|(rl, v)| v.is_some().then(|| *rl as usize * size_of::<Option<T>>()))
.sum::<usize>()
}
@ -189,9 +189,15 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
/// Populates the provided destination container with the row ids satisfying
/// the provided predicate.
///
/// TODO(edd): To get the correct behaviour when comparing against NaN, ∞ or -∞
/// need to implement some comparator functions for all the types of `T` we
/// plan to accept.
pub fn row_ids_filter(&self, value: T, op: &cmp::Operator, dst: RowIDs) -> RowIDs {
match op {
cmp::Operator::Equal | cmp::Operator::NotEqual => self.row_ids_equal(value, op, dst),
cmp::Operator::Equal | cmp::Operator::NotEqual => {
self.row_ids_cmp_equal(value, op, dst)
}
cmp::Operator::LT | cmp::Operator::LTE | cmp::Operator::GT | cmp::Operator::GTE => {
self.row_ids_cmp(value, op, dst)
}
@ -206,6 +212,9 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
///
/// Essentially, this supports:
/// `x {>, >=, <, <=} value1 AND x {>, >=, <, <=} value2`.
///
/// Note, this doesn't currently support all possible combinations of
/// operator.
pub fn row_ids_filter_range(
&self,
left: (T, &cmp::Operator),
@ -220,7 +229,7 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
| (cmp::Operator::LT, cmp::Operator::GT)
| (cmp::Operator::LT, cmp::Operator::GTE)
| (cmp::Operator::LTE, cmp::Operator::GT)
| (cmp::Operator::LTE, cmp::Operator::GTE) => self.row_ids_cmp_range_order(
| (cmp::Operator::LTE, cmp::Operator::GTE) => self.row_ids_cmp_range(
(&left.0, Self::ord_from_op(&left.1)),
(&right.0, Self::ord_from_op(&right.1)),
dst,
@ -242,37 +251,32 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
// do for string RLE where equality predicates are very common,
// or (2) we could add a sparse index to allow us to jump to the
// run-lengths that contain matching logical row_ids.
fn row_ids_equal(&self, value: T, op: &cmp::Operator, mut dst: RowIDs) -> RowIDs {
fn row_ids_cmp_equal(&self, value: T, op: &cmp::Operator, mut dst: RowIDs) -> RowIDs {
dst.clear();
if self.num_rows() == 0 {
return dst;
}
match op {
// Given an operator `cmp` is a predicate function that applies it.
let cmp = match op {
cmp::Operator::Equal => {
let mut curr_logical_row_id = 0;
for (rl, next) in &self.run_lengths {
if let Some(next) = next {
if matches!(next.partial_cmp(&value), Some(Ordering::Equal)) {
dst.add_range(curr_logical_row_id, curr_logical_row_id + rl);
}
}
curr_logical_row_id += rl;
}
|left: &T, right: &T| matches!(left.partial_cmp(right), Some(Ordering::Equal))
}
cmp::Operator::NotEqual => {
let mut curr_logical_row_id = 0;
for (rl, next) in &self.run_lengths {
if let Some(next) = next {
if !matches!(next.partial_cmp(&value), Some(Ordering::Equal)) {
dst.add_range(curr_logical_row_id, curr_logical_row_id + rl);
}
}
curr_logical_row_id += rl;
|left: &T, right: &T| !matches!(left.partial_cmp(right), Some(Ordering::Equal))
}
op => unreachable!("{:?} is an invalid operator", op),
};
let mut curr_logical_row_id = 0;
for (rl, next) in &self.run_lengths {
if let Some(next) = next {
if cmp(next, &value) {
dst.add_range(curr_logical_row_id, curr_logical_row_id + rl);
}
}
_ => unreachable!("invalid operator"),
curr_logical_row_id += rl;
}
dst
@ -294,8 +298,7 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
return dst;
}
// Given an operator `cmp` will be set to a predicate function for the
// same operator
// Given an operator `cmp` is a predicate function that applies it.
let cmp = match op {
cmp::Operator::GT => PartialOrd::gt,
cmp::Operator::GTE => PartialOrd::ge,
@ -321,26 +324,49 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
// two values.
//
// This function exists because it is more performant than calling
// `row_ids_cmp_order_bm` twice and predicates like `WHERE X > y and X <= x`
// are very common, e.g., for timestamp columns.
// `row_ids_filter` twice on predicates like `WHERE X > y and X <= x`
// which are very common, e.g., for timestamp columns.
//
// The method accepts two orderings for each predicate. If the predicate is
// `x < y` then the orderings provided should be
// `x < y` then the orderings provided will be
// `(Ordering::Less, Ordering::Less)`. This does lead to a slight overhead
// in checking non-matching values, but it means that the predicate `x <= y`
// can be supported by providing the ordering
// `(Ordering::Less, Ordering::Equal)`.
//
// For performance reasons ranges of matching values are collected up and
// added in bulk to the bitmap.
//
fn row_ids_cmp_range_order(
//.
fn row_ids_cmp_range(
&self,
_left: (&T, (std::cmp::Ordering, std::cmp::Ordering)),
_right: (&T, (std::cmp::Ordering, std::cmp::Ordering)),
mut _dst: RowIDs,
left: (&T, (Ordering, Ordering)),
right: (&T, (Ordering, Ordering)),
mut dst: RowIDs,
) -> RowIDs {
todo!()
dst.clear();
let left_op = left.1;
let right_op = right.1;
let mut curr_logical_row_id = 0;
for (rl, next) in &self.run_lengths {
if let Some(next) = next {
// compare next value against the left and right's values
let left_cmp_result = next.partial_cmp(left.0);
let right_cmp_result = next.partial_cmp(right.0);
// TODO(edd): eurgh I still don't understand how I got this to
// be correct. Need to revisit to make it simpler.
let left_result_ok =
!(left_cmp_result == Some(left_op.0) || left_cmp_result == Some(left_op.1));
let right_result_ok =
!(right_cmp_result == Some(right_op.0) || right_cmp_result == Some(right_op.1));
if !(left_result_ok || right_result_ok) {
dst.add_range(curr_logical_row_id, curr_logical_row_id + rl);
}
curr_logical_row_id += rl;
}
}
dst
}
// Helper function to convert comparison operators to cmp orderings.
@ -354,23 +380,6 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
}
}
/// Populates the provided destination container with the row ids for rows
/// that null.
pub fn row_ids_null(&self, dst: RowIDs) -> RowIDs {
self.row_ids_is_null(true, dst)
}
/// Populates the provided destination container with the row ids for rows
/// that are not null.
pub fn row_ids_not_null(&self, dst: RowIDs) -> RowIDs {
self.row_ids_is_null(false, dst)
}
// All row ids that have either NULL or not NULL values.
fn row_ids_is_null(&self, _is_null: bool, mut _dst: RowIDs) -> RowIDs {
todo!()
}
//
//
// ---- Methods for getting materialised values from row IDs.
@ -836,6 +845,25 @@ mod test {
assert_eq!(enc.size(), 80);
}
#[test]
fn size_raw() {
let mut enc: RLE<i64> = RLE::default();
// 32b Self + (0 * 16)
assert_eq!(enc.size_raw(true), 32);
assert_eq!(enc.size_raw(false), 32);
enc.push_none();
// 32b Self + (1 * 16) = 48
assert_eq!(enc.size_raw(true), 48);
assert_eq!(enc.size_raw(false), 32);
enc.push_additional_some(1, 10);
// 32b Self + (11 * 16) = 208
assert_eq!(enc.size_raw(true), 208);
assert_eq!(enc.size_raw(false), 192);
}
#[test]
fn value() {
let mut enc = RLE::default();
@ -966,7 +994,7 @@ mod test {
}
#[test]
fn row_ids_filter_eq() {
fn row_ids_filter() {
// NULL, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, NULL, 19, 2, 2
let mut enc: RLE<u8> = RLE::default();
enc.push_none();
@ -975,113 +1003,112 @@ mod test {
enc.push_additional_some(19, 1);
enc.push_additional_some(2, 2);
let dst = enc.row_ids_filter(22, &Operator::Equal, RowIDs::new_vector());
assert!(dst.unwrap_vector().is_empty());
let all_non_null = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 13, 14];
let cases = vec![
(22, Operator::Equal, vec![]),
(22, Operator::NotEqual, all_non_null.clone()),
(22, Operator::LT, all_non_null.clone()),
(22, Operator::LTE, all_non_null.clone()),
(22, Operator::GT, vec![]),
(22, Operator::GTE, vec![]),
(0, Operator::Equal, vec![]),
(0, Operator::NotEqual, all_non_null.clone()),
(0, Operator::LT, vec![]),
(0, Operator::LTE, vec![]),
(0, Operator::GT, all_non_null.clone()),
(0, Operator::GTE, all_non_null.clone()),
(
2,
Operator::Equal,
vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13, 14],
),
(2, Operator::NotEqual, vec![12]),
(2, Operator::LT, vec![]),
(
2,
Operator::LTE,
vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13, 14],
),
(2, Operator::GT, vec![12]),
(2, Operator::GTE, all_non_null.clone()),
(19, Operator::Equal, vec![12]),
(
19,
Operator::NotEqual,
vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13, 14],
),
(
19,
Operator::LT,
vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13, 14],
),
(19, Operator::LTE, all_non_null),
(19, Operator::GT, vec![]),
(19, Operator::GTE, vec![12]),
];
let dst = enc.row_ids_filter(2, &Operator::Equal, RowIDs::new_vector());
assert_eq!(
dst.unwrap_vector(),
&vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13, 14]
);
let dst = enc.row_ids_filter(19, &Operator::Equal, RowIDs::new_vector());
assert_eq!(dst.unwrap_vector(), &vec![12]);
for (v, op, exp) in cases {
let dst = enc.row_ids_filter(v, &op, RowIDs::new_vector());
assert_eq!(dst.unwrap_vector(), &exp, "example '{} {:?}' failed", op, v);
}
}
#[test]
fn row_ids_filter_nulls() {
// Test all null encoding
let mut enc: RLE<u8> = RLE::default();
enc.push_additional_none(10);
let dst = enc.row_ids_filter(2, &Operator::Equal, RowIDs::new_vector());
assert!(dst.unwrap_vector().is_empty());
let cases = vec![
(22, Operator::Equal),
(22, Operator::NotEqual),
(22, Operator::LT),
(22, Operator::LTE),
(22, Operator::GT),
(22, Operator::GTE),
];
for (v, op) in cases {
let dst = enc.row_ids_filter(v, &op, RowIDs::new_vector());
assert!(
dst.unwrap_vector().is_empty(),
"example '{} {:?}' failed",
op,
v,
);
}
}
#[test]
fn row_ids_filter_neq() {
// NULL, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, NULL, 19.0, 2.000000001
let mut enc: RLE<f64> = RLE::default();
enc.push_none();
enc.push_additional_some(2.0, 10);
enc.push_none();
enc.push_additional_some(19.0, 1);
enc.push_additional_some(2.000000001, 1);
fn row_ids_filter_range() {
let v = RLE::from(vec![
100, 100, 101, 101, 101, 200, 300, 2030, 3, 101, 4, 5, 21, 100,
]);
let dst = enc.row_ids_filter(2.0, &Operator::NotEqual, RowIDs::new_vector());
assert_eq!(dst.unwrap_vector(), &vec![12, 13]);
let cases = vec![
(
(100, &Operator::GTE),
(240, &Operator::LT),
vec![0, 1, 2, 3, 4, 5, 9, 13],
),
(
(100, &Operator::GT),
(240, &Operator::LT),
vec![2, 3, 4, 5, 9],
),
((10, &Operator::LT), (-100, &Operator::GT), vec![8, 10, 11]),
((21, &Operator::GTE), (21, &Operator::LTE), vec![12]),
(
(101, &Operator::GTE),
(2030, &Operator::LTE),
vec![2, 3, 4, 5, 6, 7, 9],
),
((10000, &Operator::LTE), (3999, &Operator::GT), vec![]),
];
let dst = enc.row_ids_filter(2.000000001, &Operator::NotEqual, RowIDs::new_vector());
assert_eq!(
dst.unwrap_vector(),
&vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12]
);
// Test all null encoding
let mut enc: RLE<i8> = RLE::default();
enc.push_additional_none(10);
let dst = enc.row_ids_filter(2, &Operator::NotEqual, RowIDs::new_vector());
assert!(dst.unwrap_vector().is_empty()); // NULL values do not match !=
// Test NaN behaviour
let mut enc: RLE<f64> = RLE::default();
enc.push_additional_none(3);
enc.push(22.3);
enc.push_additional_some(f64::NAN, 1);
enc.push_additional_some(f64::NEG_INFINITY, 1);
enc.push_additional_some(f64::INFINITY, 1);
// All non-null rows match !- 2.0 including the NaN / +- ∞
let dst = enc.row_ids_filter(2.0, &Operator::NotEqual, RowIDs::new_vector());
assert_eq!(dst.unwrap_vector(), &vec![3, 4, 5, 6]);
for (left, right, exp) in cases {
let dst = v.row_ids_filter_range(left, right, RowIDs::new_vector());
assert_eq!(dst.unwrap_vector(), &exp);
}
}
#[test]
fn row_ids_filter_lt() {
// NULL, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, NULL, 19.0, 2.000000001
let mut enc: RLE<f64> = RLE::default();
enc.push_none();
enc.push_additional_some(2.0, 10);
enc.push_none();
enc.push_additional_some(19.0, 1);
enc.push_additional_some(2.000000001, 1);
let dst = enc.row_ids_filter(2.0, &Operator::LT, RowIDs::new_vector());
assert!(dst.unwrap_vector().is_empty());
let dst = enc.row_ids_filter(2.0000000000001, &Operator::LT, RowIDs::new_vector());
assert_eq!(dst.unwrap_vector(), &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let dst = enc.row_ids_filter(2.2, &Operator::LT, RowIDs::new_vector());
assert_eq!(
dst.unwrap_vector(),
&vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
);
// // Test all null encoding
// let mut enc: RLE<i8> = RLE::default();
// enc.push_additional_none(10);
// let dst = enc.row_ids_filter(2, &Operator::LT, RowIDs::new_vector());
// assert!(dst.unwrap_vector().is_empty()); // NULL values do not match !=
// // Test NaN behaviour
// let mut enc: RLE<f64> = RLE::default();
// enc.push_additional_none(3);
// enc.push(22.3);
// enc.push_additional_some(f64::NAN, 1);
// enc.push_additional_some(f64::NEG_INFINITY, 1);
// enc.push_additional_some(f64::INFINITY, 1);
// // All non-null rows match !- 2.0 including the NaN / +- ∞
// let dst = enc.row_ids_filter(2.0, &Operator::LT, RowIDs::new_vector());
// assert_eq!(dst.unwrap_vector(), &vec![3, 4, 5, 6]);
}
#[test]
fn row_ids_filter_lte() {}
#[test]
fn row_ids_filter_gt() {}
#[test]
fn row_ids_filter_gte() {}
#[test]
fn row_ids_filter_range() {}
}

View File

@ -333,6 +333,7 @@ impl From<arrow::array::Float64Array> for FloatEncoding {
#[cfg(test)]
mod test {
use super::*;
use cmp::Operator;
#[test]
fn size_raw() {
@ -355,4 +356,60 @@ mod test {
assert_eq!(enc.size_raw(true), 64);
assert_eq!(enc.size_raw(false), 56);
}
#[test]
// Test NaN behaviour when `FloatEncoder`s are used.
//
// TODO(edd): I need to add the correct comparators to the scalar encodings
// so that they behave the same as PG
//
fn row_ids_filter_nan() {
let data = vec![22.3, f64::NAN, f64::NEG_INFINITY, f64::INFINITY];
let cases = vec![
FloatEncoding::RLE64(RLE::from(data.clone())),
FloatEncoding::Fixed64(Fixed::from(data.as_slice())),
FloatEncoding::FixedNull64(FixedNull::from(data.as_slice())),
];
for enc in cases {
_row_ids_filter_nan(enc)
}
}
fn _row_ids_filter_nan(_enc: FloatEncoding) {
// These cases replicate the behaviour in PG.
let cases = vec![
(2.0, Operator::Equal, vec![0]), // 22.3
(2.0, Operator::NotEqual, vec![0, 1, 2, 3]), // 22.3, NaN, -∞, ∞
(2.0, Operator::LT, vec![2]), // -∞
(2.0, Operator::LTE, vec![2]), // -∞
(2.0, Operator::GT, vec![0, 1, 3]), // 22.3, NaN, ∞
(2.0, Operator::GTE, vec![0, 1, 3]), // 22.3, NaN, ∞
(f64::NAN, Operator::Equal, vec![1]), // NaN
(f64::NAN, Operator::NotEqual, vec![0, 2, 3]), // 22.3, -∞, ∞
(f64::NAN, Operator::LT, vec![0, 2, 3]), // 22.3, -∞, ∞
(f64::NAN, Operator::LTE, vec![0, 1, 2, 3]), // 22.3, NaN, -∞, ∞
(f64::NAN, Operator::GT, vec![]), //
(f64::NAN, Operator::GTE, vec![1]), // NaN
(f64::INFINITY, Operator::Equal, vec![3]), // ∞
(f64::INFINITY, Operator::NotEqual, vec![0, 1, 2]), // 22.3, NaN, -∞
(f64::INFINITY, Operator::LT, vec![0, 2]), // 22.3, -∞
(f64::INFINITY, Operator::LTE, vec![0, 2, 3]), // 22.3, -∞, ∞
(f64::INFINITY, Operator::GT, vec![1]), // NaN
(f64::INFINITY, Operator::GTE, vec![1, 3]), // NaN, ∞
(f64::NEG_INFINITY, Operator::Equal, vec![2]), // -∞
(f64::NEG_INFINITY, Operator::NotEqual, vec![0, 1, 3]), // 22.3, NaN, ∞
(f64::NEG_INFINITY, Operator::LT, vec![]), //
(f64::NEG_INFINITY, Operator::LTE, vec![2]), // -∞
(f64::NEG_INFINITY, Operator::GT, vec![0, 1, 3]), // 22.3, NaN, ∞
(f64::NEG_INFINITY, Operator::GTE, vec![0, 1, 2, 3]), // 22.3, NaN, -∞, ∞
];
// TODO(edd): I need to add support for PG-like comparators for NaN etc.
for (_v, _op, _exp) in cases {
//let dst = enc.row_ids_filter(v, &op, RowIDs::new_vector());
//assert_eq!(dst.unwrap_vector(), &exp, "example '{} {:?}' failed", op, v);
}
}
}