From 70b0ba44b303ec48e124df61886eb6e13c7713fa Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 10 Sep 2021 17:15:37 +0100 Subject: [PATCH 1/8] test: failing filter test --- read_buffer/src/column/float.rs | 87 +++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/read_buffer/src/column/float.rs b/read_buffer/src/column/float.rs index 56022c180f..ae89ec3574 100644 --- a/read_buffer/src/column/float.rs +++ b/read_buffer/src/column/float.rs @@ -754,4 +754,91 @@ mod test { //assert_eq!(dst.unwrap_vector(), &exp, "example '{} {:?}' failed", op, v); } } + + #[test] + fn row_ids_filter_float_trimmer() { + let data = vec![100.0, 200.0, 100.0, 300.0, 400.0]; + + let float_trimmer = FloatByteTrimmer {}; + let data_float_trimmed = data + .iter() + .cloned() + .map::(|x| float_trimmer.encode(x)) + .collect::>(); + + let cases: Vec>> = vec![ + Box::new(RLE::<_, _, _>::new_from_iter( + data.iter().cloned(), + NoOpTranscoder {}, + )), + Box::new(Fixed::::new( + data_float_trimmed.clone(), + float_trimmer, + )), + Box::new(FixedNull::::new( + PrimitiveArray::from(data_float_trimmed), + FloatByteTrimmer {}, + )), + ]; + + for enc in cases { + _row_ids_filter_float_trimmer(enc) + } + } + + fn _row_ids_filter_float_trimmer(enc: Box>) { + // These cases replicate the behaviour in PG. + // + // [100.0, 200.0, 100.0, 300.0, 400.0] + let cases = vec![ + (100.0, Operator::Equal, vec![0, 2]), // 100.0, 100.0 + (100.0, Operator::NotEqual, vec![1, 3, 4]), // 200.0, 300.0, 400.0 + (100.0, Operator::LT, vec![]), // + (100.0, Operator::LTE, vec![0, 2]), // 100.0, 100.0 + (100.0, Operator::GT, vec![1, 3, 4]), // 200.0, 300.0, 400.0 + (100.0, Operator::GTE, vec![0, 1, 2, 3, 4]), // 100.0, 200.0, 100.0, 300.0, 400.0 + (200.0, Operator::Equal, vec![1]), // 200.0 + (200.0, Operator::NotEqual, vec![0, 2, 3, 4]), // 100.0, 100.0, 300.0, 400.0 + (200.0, Operator::LT, vec![0, 2]), // 100.0, 100.0 + (200.0, Operator::LTE, vec![0, 1, 2]), // 100.0, 200.0, 100.0 + (200.0, Operator::GT, vec![3, 4]), // 300.0, 400.0 + (200.0, Operator::GTE, vec![1, 3, 4]), // 200.0, 300.0, 400.0 + (400.0, Operator::Equal, vec![4]), // 400.0 + (400.0, Operator::NotEqual, vec![0, 1, 2, 3]), // 100.0, 200.0, 100.0, 300.0 + (400.0, Operator::LT, vec![0, 1, 2, 3]), // 100.0, 200.0, 100.0, 300.0 + (400.0, Operator::LTE, vec![0, 1, 2, 3, 4]), // 100.0, 200.0, 100.0, 300.0, 400.0 + (400.0, Operator::GT, vec![]), // + (400.0, Operator::GTE, vec![4]), // 400.0 + // Values not present in the column + (99.0, Operator::Equal, vec![]), // + (99.0, Operator::NotEqual, vec![0, 1, 2, 3, 4]), // 100.0, 200.0, 100.0, 300.0, 400.0 + (99.0, Operator::LT, vec![]), // + (99.0, Operator::LTE, vec![]), // + (99.0, Operator::GT, vec![0, 1, 2, 3, 4]), // 100.0, 200.0, 100.0, 300.0, 400.0 + (99.0, Operator::GTE, vec![0, 1, 2, 3, 4]), // 100.0, 200.0, 100.0, 300.0, 400.0 + (200.4, Operator::Equal, vec![]), // + (200.4, Operator::NotEqual, vec![0, 1, 2, 3, 4]), // 100.0, 200.0, 100.0, 300.0, 400.0 + (200.4, Operator::LT, vec![0, 1, 2]), // 100.0, 200.0, 100.0 + (200.4, Operator::LTE, vec![0, 1, 2]), // 100.0, 200.0, 100.0 + (200.4, Operator::GT, vec![3, 4]), // 300.0, 400.0 + (200.4, Operator::GTE, vec![3, 4]), // 300.0, 400.0 + (201.0, Operator::Equal, vec![]), // + (201.0, Operator::NotEqual, vec![0, 1, 2, 3, 4]), // 100.0, 200.0, 100.0, 300.0, 400.0 + (201.0, Operator::LT, vec![0, 1, 2]), // 100.0, 200.0, 100.0 + (201.0, Operator::LTE, vec![0, 1, 2]), // 100.0, 200.0, 100.0 + (201.0, Operator::GT, vec![3, 4]), // 300.0, 400.0 + (201.0, Operator::GTE, vec![3, 4]), // 300.0, 400.0 + (401.0, Operator::Equal, vec![]), // + (401.0, Operator::NotEqual, vec![0, 1, 2, 3, 4]), // 100.0, 200.0, 100.0, 300.0, 400.0 + (401.0, Operator::LT, vec![0, 1, 2, 3, 4]), // 100.0, 200.0, 100.0, 300.0, 400.0 + (401.0, Operator::LTE, vec![0, 1, 2, 3, 4]), // 100.0, 200.0, 100.0, 300.0, 400.0 + (401.0, Operator::GT, vec![]), // + (401.0, Operator::GTE, vec![]), // + ]; + + for (v, op, exp) in cases { + let dst = enc.row_ids_filter(v, &op, RowIDs::new_vector()); + assert_eq!(dst.unwrap_vector(), &exp, "example '{} {:?}' failed", op, v); + } + } } From d04a0d1137740cce887e7cf678517ddbf60459e2 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 14 Sep 2021 18:48:57 +0100 Subject: [PATCH 2/8] feat: add method for identifying all non-null row IDs --- .../src/column/encoding/scalar/fixed_null.rs | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/read_buffer/src/column/encoding/scalar/fixed_null.rs b/read_buffer/src/column/encoding/scalar/fixed_null.rs index 7168a28d75..410df6e0ee 100644 --- a/read_buffer/src/column/encoding/scalar/fixed_null.rs +++ b/read_buffer/src/column/encoding/scalar/fixed_null.rs @@ -3,6 +3,7 @@ //! This encoding stores a column of fixed-width numerical values backed by an //! an Arrow array, allowing for storage of NULL values. use either::Either; +use observability_deps::tracing::debug; use std::fmt::Debug; use std::marker::PhantomData; use std::mem::size_of; @@ -227,6 +228,38 @@ where } dst } + + // Identify all row IDs that contain a non-null value. + fn all_non_null_row_ids(&self, mut dst: RowIDs) -> RowIDs { + dst.clear(); + + let mut found = false; + let mut count = 0; + for i in 0..self.num_rows() as usize { + if self.arr.is_null(i) && found { + // add the non-null range + let (min, max) = (i as u32 - count, i as u32); + dst.add_range(min, max); + found = false; + count = 0; + continue; + } else if self.arr.is_null(i) { + continue; + } + + if !found { + found = true; + } + count += 1; + } + + // add any remaining range. + if found { + let (min, max) = (self.num_rows() - count, self.num_rows()); + dst.add_range(min, max); + } + dst + } } impl ScalarEncoding for FixedNull @@ -789,6 +822,38 @@ mod test { assert_eq!(row_ids.to_vec(), vec![1, 2, 4]); } + #[test] + fn row_ids_filter_range_all_non_null() { + let cases = vec![ + (vec![None], vec![]), + (vec![None, None, None], vec![]), + (vec![Some(22)], vec![0_u32]), + (vec![Some(22), Some(3), Some(3)], vec![0, 1, 2]), + (vec![Some(22), None], vec![0]), + ( + vec![Some(22), None, Some(1), None, Some(3), None], + vec![0, 3, 4], + ), + (vec![Some(22), None, None, Some(33)], vec![0, 3]), + (vec![None, None, Some(33)], vec![2]), + ( + vec![None, None, Some(33), None, None, Some(3), Some(3), Some(1)], + vec![2, 5, 6, 7], + ), + ]; + + for (i, (data, exp)) in cases.into_iter().enumerate() { + let (v, _) = new_encoding(data); + let dst = RowIDs::new_vector(); + assert_eq!( + v.all_non_null_row_ids(dst).unwrap_vector(), + &exp, + "example {:?} failed", + i + ); + } + } + #[test] fn has_non_null_value() { let (v, _) = new_encoding(vec![None, None]); From 483508e3c62e9f809bb3f98a4d815051cf8a2deb Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 14 Sep 2021 22:48:03 +0100 Subject: [PATCH 3/8] feat: add rle method for identifying all non-null row IDs --- read_buffer/src/column/encoding/scalar/rle.rs | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/read_buffer/src/column/encoding/scalar/rle.rs b/read_buffer/src/column/encoding/scalar/rle.rs index fc8c1b3bc2..09ecbe05c4 100644 --- a/read_buffer/src/column/encoding/scalar/rle.rs +++ b/read_buffer/src/column/encoding/scalar/rle.rs @@ -305,6 +305,25 @@ where dst } + fn all_non_null_row_ids(&self, mut dst: RowIDs) -> RowIDs { + dst.clear(); + + if self.null_count() == 0 { + dst.add_range(0, self.num_rows()); + return dst; + } + + let mut curr_logical_row_id = 0; + for (rl, next) in &self.run_lengths { + if next.is_some() { + dst.add_range(curr_logical_row_id, curr_logical_row_id + rl); + } + curr_logical_row_id += rl; + } + + dst + } + // Helper function to convert comparison operators to cmp orderings. fn ord_from_op(op: &cmp::Operator) -> (Ordering, Ordering) { match op { @@ -622,6 +641,16 @@ mod test { ) } + fn new_encoding_opt( + values: Vec>, + ) -> (RLE>, Arc) { + let mock = Arc::new(MockTranscoder::default()); + ( + RLE::new_from_iter_opt(values.into_iter(), Arc::clone(&mock)), + mock, + ) + } + #[test] fn new_from_iter() { let cases = vec![ @@ -981,6 +1010,38 @@ mod test { } } + #[test] + fn row_ids_filter_range_all_non_null() { + let cases = vec![ + (vec![None], vec![]), + (vec![None, None, None], vec![]), + (vec![Some(22)], vec![0_u32]), + (vec![Some(22), Some(3), Some(3)], vec![0, 1, 2]), + (vec![Some(22), None], vec![0]), + ( + vec![Some(22), None, Some(1), None, Some(3), None], + vec![0, 2, 4], + ), + (vec![Some(22), None, None, Some(33)], vec![0, 3]), + (vec![None, None, Some(33)], vec![2]), + ( + vec![None, None, Some(33), None, None, Some(3), Some(3), Some(1)], + vec![2, 5, 6, 7], + ), + ]; + + for (i, (data, exp)) in cases.into_iter().enumerate() { + let (v, _) = new_encoding_opt(data); + let dst = RowIDs::new_vector(); + assert_eq!( + v.all_non_null_row_ids(dst).unwrap_vector(), + &exp, + "example {:?} failed", + i + ); + } + } + #[test] fn row_ids_filter_range() { let (enc, transcoder) = new_encoding(vec![ From 1a70865a03e24146cbae686f97da7dae653e9046 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 14 Sep 2021 22:56:22 +0100 Subject: [PATCH 4/8] fix: ensure float byte trimmed predicate pushdown works for unencodable values --- .../src/column/encoding/scalar/fixed.rs | 29 ++++- .../src/column/encoding/scalar/fixed_null.rs | 41 +++++-- read_buffer/src/column/encoding/scalar/rle.rs | 36 +++++-- .../src/column/encoding/scalar/transcoders.rs | 95 ++++++++++++++-- read_buffer/src/column/float.rs | 101 +++++++++++++++++- 5 files changed, 270 insertions(+), 32 deletions(-) diff --git a/read_buffer/src/column/encoding/scalar/fixed.rs b/read_buffer/src/column/encoding/scalar/fixed.rs index 6cf6b0b63e..5bd15667f5 100644 --- a/read_buffer/src/column/encoding/scalar/fixed.rs +++ b/read_buffer/src/column/encoding/scalar/fixed.rs @@ -7,6 +7,7 @@ //! allow results to be emitted as some logical type `L` via a transformation //! `T`. use either::Either; +use observability_deps::tracing::debug; use std::cmp::Ordering; use std::fmt::Debug; use std::marker::PhantomData; @@ -373,14 +374,36 @@ where Some(self.transcoder.decode(max)) } - fn row_ids_filter(&self, value: L, op: &cmp::Operator, dst: RowIDs) -> RowIDs { - let value = self.transcoder.encode(value); + fn row_ids_filter(&self, value: L, op: &cmp::Operator, mut dst: RowIDs) -> RowIDs { + debug!(value=?value, operator=?op, encoding=?ENCODING_NAME, "row_ids_filter"); + let (value, op) = match self.transcoder.encode_comparable(value, *op) { + Some((value, op)) => (value, op), + None => { + // The value is not encodable. This can happen with the == or != + // operator. In the case of ==, no values in the encoding could + // possible satisfy the expression. In the case of !=, all + // values would satisfy the expression. + dst.clear(); + return match op { + cmp::Operator::Equal => dst, + cmp::Operator::NotEqual => { + dst.add_range(0, self.num_rows()); + dst + } + op => panic!("operator {:?} not expected", op), + }; + } + }; + debug!(value=?value, operator=?op, encoding=?ENCODING_NAME, "row_ids_filter encoded expr"); + + // N.B, the transcoder may have changed the operator depending on the + // value provided. match op { cmp::Operator::GT => self.row_ids_cmp_order(&value, PartialOrd::gt, dst), cmp::Operator::GTE => self.row_ids_cmp_order(&value, PartialOrd::ge, dst), cmp::Operator::LT => self.row_ids_cmp_order(&value, PartialOrd::lt, dst), cmp::Operator::LTE => self.row_ids_cmp_order(&value, PartialOrd::le, dst), - _ => self.row_ids_equal(&value, op, dst), + _ => self.row_ids_equal(&value, &op, dst), } } diff --git a/read_buffer/src/column/encoding/scalar/fixed_null.rs b/read_buffer/src/column/encoding/scalar/fixed_null.rs index 410df6e0ee..85cb3d6fb2 100644 --- a/read_buffer/src/column/encoding/scalar/fixed_null.rs +++ b/read_buffer/src/column/encoding/scalar/fixed_null.rs @@ -233,6 +233,11 @@ where fn all_non_null_row_ids(&self, mut dst: RowIDs) -> RowIDs { dst.clear(); + if self.null_count() == 0 { + dst.add_range(0, self.num_rows()); + return dst; + } + let mut found = false; let mut count = 0; for i in 0..self.num_rows() as usize { @@ -444,14 +449,34 @@ where max.map(|v| self.transcoder.decode(v)) } - fn row_ids_filter(&self, value: L, op: &cmp::Operator, dst: RowIDs) -> RowIDs { - let value = self.transcoder.encode(value); + fn row_ids_filter(&self, value: L, op: &cmp::Operator, mut dst: RowIDs) -> RowIDs { + debug!(value=?value, operator=?op, encoding=?ENCODING_NAME, "row_ids_filter"); + let (value, op) = match self.transcoder.encode_comparable(value, *op) { + Some((value, op)) => (value, op), + None => { + // The value is not encodable. This can happen with the == or != + // operator. In the case of ==, no values in the encoding could + // possible satisfy the expression. In the case of !=, all + // non-null values would satisfy the expression. + dst.clear(); + return match op { + cmp::Operator::Equal => dst, + cmp::Operator::NotEqual => { + dst = self.all_non_null_row_ids(dst); + dst + } + op => panic!("operator {:?} not expected", op), + }; + } + }; + debug!(value=?value, operator=?op, encoding=?ENCODING_NAME, "row_ids_filter encoded expr"); + match op { - cmp::Operator::GT => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst), - cmp::Operator::GTE => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst), - cmp::Operator::LT => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst), - cmp::Operator::LTE => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst), - _ => self.row_ids_equal(value, op, dst), + cmp::Operator::GT => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst), + cmp::Operator::GTE => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst), + cmp::Operator::LT => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst), + cmp::Operator::LTE => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst), + _ => self.row_ids_equal(value, &op, dst), } } @@ -832,7 +857,7 @@ mod test { (vec![Some(22), None], vec![0]), ( vec![Some(22), None, Some(1), None, Some(3), None], - vec![0, 3, 4], + vec![0, 2, 4], ), (vec![Some(22), None, None, Some(33)], vec![0, 3]), (vec![None, None, Some(33)], vec![2]), diff --git a/read_buffer/src/column/encoding/scalar/rle.rs b/read_buffer/src/column/encoding/scalar/rle.rs index 09ecbe05c4..afef119edc 100644 --- a/read_buffer/src/column/encoding/scalar/rle.rs +++ b/read_buffer/src/column/encoding/scalar/rle.rs @@ -1,4 +1,5 @@ use either::Either; +use observability_deps::tracing::debug; use crate::column::cmp; use crate::column::RowIDs; @@ -394,15 +395,34 @@ where self.num_rows } - fn row_ids_filter(&self, value: L, op: &cmp::Operator, dst: RowIDs) -> RowIDs { - let value = self.transcoder.encode(value); + fn row_ids_filter(&self, value: L, op: &cmp::Operator, mut dst: RowIDs) -> RowIDs { + debug!(value=?value, operator=?op, encoding=?ENCODING_NAME, "row_ids_filter"); + let (value, op) = match self.transcoder.encode_comparable(value, *op) { + Some((value, op)) => (value, op), + None => { + // The value is not encodable. This can happen with the == or != + // operator. In the case of ==, no values in the encoding could + // possible satisfy the expression. In the case of !=, all + // non-null values would satisfy the expression. + dst.clear(); + return match op { + cmp::Operator::Equal => dst, + cmp::Operator::NotEqual => { + dst = self.all_non_null_row_ids(dst); + dst + } + op => panic!("operator {:?} not expected", op), + }; + } + }; + debug!(value=?value, operator=?op, encoding=?ENCODING_NAME, "row_ids_filter encoded expr"); + match op { - cmp::Operator::Equal | cmp::Operator::NotEqual => { - self.row_ids_cmp_equal(value, op, dst) - } - cmp::Operator::LT | cmp::Operator::LTE | cmp::Operator::GT | cmp::Operator::GTE => { - self.row_ids_cmp(value, op, dst) - } + cmp::Operator::GT => self.row_ids_cmp(value, &op, dst), + cmp::Operator::GTE => self.row_ids_cmp(value, &op, dst), + cmp::Operator::LT => self.row_ids_cmp(value, &op, dst), + cmp::Operator::LTE => self.row_ids_cmp(value, &op, dst), + _ => self.row_ids_cmp_equal(value, &op, dst), } } diff --git a/read_buffer/src/column/encoding/scalar/transcoders.rs b/read_buffer/src/column/encoding/scalar/transcoders.rs index 2628fde983..da5db2c6b7 100644 --- a/read_buffer/src/column/encoding/scalar/transcoders.rs +++ b/read_buffer/src/column/encoding/scalar/transcoders.rs @@ -1,3 +1,4 @@ +use crate::column::cmp::Operator; use std::{ convert::TryFrom, fmt::{Debug, Display}, @@ -13,7 +14,17 @@ use std::{ // `P` is a physical type that is stored directly within an encoding, `L` is // a logical type callers expect to be returned. pub trait Transcoder: Debug + Display { + /// A function that encodes a logical value into a physical representation. fn encode(&self, _: L) -> P; + + /// A function that attempts to encode a logical value, within the context + /// of a comparison operator, into a physical representation. + /// + /// Implementation should return a suitable operator for the physical + /// representation, which may differ from the provided operator. + fn encode_comparable(&self, _: L, _: Operator) -> Option<(P, Operator)>; + + /// A function to decode a physical representation back into a logical value. fn decode(&self, _: P) -> L; } @@ -29,6 +40,10 @@ impl Transcoder for NoOpTranscoder { v } + fn encode_comparable(&self, v: T, op: Operator) -> Option<(T, Operator)> { + Some((v, op)) + } + fn decode(&self, v: T) -> T { v } @@ -56,13 +71,17 @@ pub struct ByteTrimmer {} impl Transcoder for ByteTrimmer where L: From

, - P: TryFrom, + P: TryFrom + PartialEq + PartialOrd,

>::Error: std::fmt::Debug, { fn encode(&self, v: L) -> P { P::try_from(v).unwrap() } + fn encode_comparable(&self, v: L, op: Operator) -> Option<(P, Operator)> { + P::try_from(v).ok().map(|p| (p, op)) + } + fn decode(&self, v: P) -> L { L::from(v) } @@ -91,6 +110,54 @@ macro_rules! make_float_trimmer { v as $type } + fn encode_comparable(&self, v: f64, op: Operator) -> Option<($type, Operator)> { + assert!(v <= <$type>::MAX as f64); + if v == ((v as $type) as f64) { + return Some((v as $type, op)); + } + + match op { + Operator::Equal => { + None // no encoded values will == v + } + Operator::NotEqual => { + None // all encoded values will != v + } + Operator::LT => { + // convert to next highest encodable value. For example + // given '< 23.2` return 24.0 encoded as the physical + // type. < 23.2 is logically equivalent to < 24.0 since + // there are no valid values in the domain (23.2, 24.0). + Some((v.ceil() as $type, op)) + } + Operator::LTE => { + // convert to next highest encodable value and change + // operator to <. + // For example given '<= 23.2` return 24.0 encoded as + // the physical type. <= 23.2 is logically equivalent + // to < 24.0 since there are no valid values in the + // domain [23.2, 24.0). + Some((v.ceil() as $type, Operator::LT)) + } + Operator::GT => { + // convert to next lowest encodable value. For example + // given '> 23.2` return 23.0 encoded as the physical + // type. > 23.2 is logically equivalent to > 23.0 since + // there are no valid values in the domain (23.0, 23.2]. + Some((v.floor() as $type, op)) + } + Operator::GTE => { + // convert to next lowest encodable value and change + // operator to >. + // For example given '>= 23.2` return 23.0 encoded as + // the physical type. >= 23.2 is logically equivalent + // to > 24.0 since there are no valid values in the + // domain [23.2, 24.0). + Some((v.floor() as $type, Operator::GT)) + } + } + } + fn decode(&self, v: $type) -> f64 { v.into() } @@ -119,7 +186,7 @@ impl Display for FloatByteTrimmer { // result. #[cfg(test)] -use std::{sync::atomic::AtomicUsize, sync::atomic::Ordering, sync::Arc}; +use std::{sync::atomic, sync::atomic::AtomicUsize, sync::Arc}; #[cfg(test)] /// A mock implementation of Transcoder that tracks calls to encode and decode. /// This is useful for testing encoder implementations. @@ -127,6 +194,7 @@ use std::{sync::atomic::AtomicUsize, sync::atomic::Ordering, sync::Arc}; pub struct MockTranscoder { encoding_calls: AtomicUsize, decoding_calls: AtomicUsize, + partial_cmp_calls: AtomicUsize, } #[cfg(test)] @@ -135,6 +203,7 @@ impl Default for MockTranscoder { Self { encoding_calls: AtomicUsize::default(), decoding_calls: AtomicUsize::default(), + partial_cmp_calls: AtomicUsize::default(), } } } @@ -142,23 +211,28 @@ impl Default for MockTranscoder { #[cfg(test)] impl MockTranscoder { pub fn encodings(&self) -> usize { - self.encoding_calls.load(Ordering::Relaxed) + self.encoding_calls.load(atomic::Ordering::Relaxed) } pub fn decodings(&self) -> usize { - self.decoding_calls.load(Ordering::Relaxed) + self.decoding_calls.load(atomic::Ordering::Relaxed) } } #[cfg(test)] impl Transcoder for MockTranscoder { fn encode(&self, v: T) -> T { - self.encoding_calls.fetch_add(1, Ordering::Relaxed); + self.encoding_calls.fetch_add(1, atomic::Ordering::Relaxed); v } + fn encode_comparable(&self, v: T, op: Operator) -> Option<(T, Operator)> { + self.encoding_calls.fetch_add(1, atomic::Ordering::Relaxed); + Some((v, op)) + } + fn decode(&self, v: T) -> T { - self.decoding_calls.fetch_add(1, Ordering::Relaxed); + self.decoding_calls.fetch_add(1, atomic::Ordering::Relaxed); v } } @@ -166,12 +240,17 @@ impl Transcoder for MockTranscoder { #[cfg(test)] impl Transcoder for Arc { fn encode(&self, v: T) -> T { - self.encoding_calls.fetch_add(1, Ordering::Relaxed); + self.encoding_calls.fetch_add(1, atomic::Ordering::Relaxed); v } + fn encode_comparable(&self, v: T, op: Operator) -> Option<(T, Operator)> { + self.encoding_calls.fetch_add(1, atomic::Ordering::Relaxed); + Some((v, op)) + } + fn decode(&self, v: T) -> T { - self.decoding_calls.fetch_add(1, Ordering::Relaxed); + self.decoding_calls.fetch_add(1, atomic::Ordering::Relaxed); v } } diff --git a/read_buffer/src/column/float.rs b/read_buffer/src/column/float.rs index ae89ec3574..4240e8481d 100644 --- a/read_buffer/src/column/float.rs +++ b/read_buffer/src/column/float.rs @@ -767,13 +767,13 @@ mod test { .collect::>(); let cases: Vec>> = vec![ - Box::new(RLE::<_, _, _>::new_from_iter( - data.iter().cloned(), - NoOpTranscoder {}, + Box::new(RLE::::new_from_iter( + data_float_trimmed.iter().cloned(), + float_trimmer, )), Box::new(Fixed::::new( data_float_trimmed.clone(), - float_trimmer, + FloatByteTrimmer {}, )), Box::new(FixedNull::::new( PrimitiveArray::from(data_float_trimmed), @@ -838,7 +838,98 @@ mod test { for (v, op, exp) in cases { let dst = enc.row_ids_filter(v, &op, RowIDs::new_vector()); - assert_eq!(dst.unwrap_vector(), &exp, "example '{} {:?}' failed", op, v); + assert_eq!( + dst.unwrap_vector(), + &exp, + "example '{} {:?}' failed for {:?}", + op, + v, + enc.name() + ); + } + } + + #[test] + fn row_ids_filter_float_trimmer_with_nulls() { + let data = vec![Some(100.0), None, None, Some(200.0), None]; + + let float_trimmer = FloatByteTrimmer {}; + + let cases: Vec>> = vec![ + Box::new(RLE::::new_from_iter_opt( + data.iter() + .cloned() + .map(|x| x.map(|v| float_trimmer.encode(v))), + FloatByteTrimmer {}, + )), + Box::new(FixedNull::::new( + data.iter() + .cloned() + .map(|v| v.map(|v| float_trimmer.encode(v))) + .collect(), + FloatByteTrimmer {}, + )), + ]; + + for enc in cases { + _row_ids_filter_float_trimmer_with_nulls(enc) + } + } + + fn _row_ids_filter_float_trimmer_with_nulls(enc: Box>) { + // These cases replicate the behaviour in PG. + // + // [100.0, NULL, NULL, 200.0] + let cases = vec![ + (100.0, Operator::Equal, vec![0]), // 100.0 + (100.0, Operator::NotEqual, vec![3]), // 200.0 + (100.0, Operator::LT, vec![]), // + (100.0, Operator::LTE, vec![0]), // 100.0 + (100.0, Operator::GT, vec![3]), // 200.0 + (100.0, Operator::GTE, vec![0, 3]), // 100.0, 200.0 + (200.0, Operator::Equal, vec![3]), // 200.0 + (200.0, Operator::NotEqual, vec![0]), // 100.0 + (200.0, Operator::LT, vec![0]), // 100.0 + (200.0, Operator::LTE, vec![0, 3]), // 100.0, 200.0 + (200.0, Operator::GT, vec![]), // + (200.0, Operator::GTE, vec![3]), // 200.0 + // Values not present in the column + (99.0, Operator::Equal, vec![]), // + (99.0, Operator::NotEqual, vec![0, 3]), // 100.0, 200.0 + (99.0, Operator::LT, vec![]), // + (99.0, Operator::LTE, vec![]), // + (99.0, Operator::GT, vec![0, 3]), // 100.0, 200.0 + (99.0, Operator::GTE, vec![0, 3]), // 100.0, 200.0 + (200.4, Operator::Equal, vec![]), // + (200.4, Operator::NotEqual, vec![0, 3]), // 100.0, 200.0 + (200.4, Operator::LT, vec![0, 3]), // 100.0,200.0 + (200.4, Operator::LTE, vec![0, 3]), // 100.0, 200.0 + (200.4, Operator::GT, vec![]), // + (200.4, Operator::GTE, vec![]), // + (201.0, Operator::Equal, vec![]), // + (201.0, Operator::NotEqual, vec![0, 3]), // 100.0, 200.0 + (201.0, Operator::LT, vec![0, 3]), // 100.0, 200.0 + (201.0, Operator::LTE, vec![0, 3]), // 100.0, 200.0 + (201.0, Operator::GT, vec![]), // + (201.0, Operator::GTE, vec![]), // + (401.0, Operator::Equal, vec![]), // + (401.0, Operator::NotEqual, vec![0, 3]), // 100.0, 200.0 + (401.0, Operator::LT, vec![0, 3]), // 100.0, 200.0 + (401.0, Operator::LTE, vec![0, 3]), // 100.0, 200.0 + (401.0, Operator::GT, vec![]), // + (401.0, Operator::GTE, vec![]), // + ]; + + for (v, op, exp) in cases { + let dst = enc.row_ids_filter(v, &op, RowIDs::new_vector()); + assert_eq!( + dst.unwrap_vector(), + &exp, + "example '{} {:?}' failed for {:?}", + op, + v, + enc.name() + ); } } } From 0250bd1337c8c54c44446bcdfca0d0baba97bb97 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 15 Sep 2021 13:37:41 +0100 Subject: [PATCH 5/8] fix: ensure range filter works with null --- read_buffer/src/column/encoding/scalar/rle.rs | 41 ++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/read_buffer/src/column/encoding/scalar/rle.rs b/read_buffer/src/column/encoding/scalar/rle.rs index afef119edc..dbe1f44c3c 100644 --- a/read_buffer/src/column/encoding/scalar/rle.rs +++ b/read_buffer/src/column/encoding/scalar/rle.rs @@ -289,18 +289,16 @@ where let left_cmp_result = next.partial_cmp(left.0); let right_cmp_result = next.partial_cmp(right.0); - // TODO(edd): eurgh I still don't understand how I got this to - // be correct. Need to revisit to make it simpler. let left_result_ok = - !(left_cmp_result == Some(left_op.0) || left_cmp_result == Some(left_op.1)); + left_cmp_result == Some(left_op.0) || left_cmp_result == Some(left_op.1); let right_result_ok = - !(right_cmp_result == Some(right_op.0) || right_cmp_result == Some(right_op.1)); + right_cmp_result == Some(right_op.0) || right_cmp_result == Some(right_op.1); - if !(left_result_ok || right_result_ok) { + if left_result_ok && right_result_ok { dst.add_range(curr_logical_row_id, curr_logical_row_id + rl); } - curr_logical_row_id += rl; } + curr_logical_row_id += rl; } dst @@ -1097,6 +1095,37 @@ mod test { assert_eq!(transcoder.encodings(), calls * 2); } + #[test] + fn row_ids_filter_range_nulls() { + let (enc, transcoder) = new_encoding_opt(vec![ + Some(100), + None, + None, + None, + Some(100), + Some(101), + Some(101), + ]); + + let cases = vec![ + ( + (100, &Operator::GTE), + (240, &Operator::LT), + vec![0, 4, 5, 6], + ), + ((100, &Operator::GT), (240, &Operator::LT), vec![5, 6]), + ((10, &Operator::LT), (-100, &Operator::GT), vec![]), + ((21, &Operator::GTE), (100, &Operator::LTE), vec![0, 4]), + ]; + + let calls = cases.len(); + for (left, right, exp) in cases { + let dst = enc.row_ids_filter_range(left, right, RowIDs::new_vector()); + assert_eq!(dst.unwrap_vector(), &exp); + } + assert_eq!(transcoder.encodings(), calls * 2); + } + #[test] fn estimate_rle_size() { let cases = vec![ From d387108dabfca3d2af07e4eb2e9012e3ed7373a5 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 15 Sep 2021 13:39:58 +0100 Subject: [PATCH 6/8] fix: float byte trimmer filter range --- .../src/column/encoding/scalar/fixed.rs | 16 ++- .../src/column/encoding/scalar/fixed_null.rs | 16 ++- read_buffer/src/column/encoding/scalar/rle.rs | 16 ++- read_buffer/src/column/float.rs | 129 +++++++++++++++++- 4 files changed, 161 insertions(+), 16 deletions(-) diff --git a/read_buffer/src/column/encoding/scalar/fixed.rs b/read_buffer/src/column/encoding/scalar/fixed.rs index 5bd15667f5..f8d8193a0b 100644 --- a/read_buffer/src/column/encoding/scalar/fixed.rs +++ b/read_buffer/src/column/encoding/scalar/fixed.rs @@ -413,8 +413,16 @@ where right: (L, &cmp::Operator), dst: RowIDs, ) -> RowIDs { - let left = (self.transcoder.encode(left.0), left.1); - let right = (self.transcoder.encode(right.0), right.1); + debug!(left=?left, right=?right, encoding=?ENCODING_NAME, "row_ids_filter_range"); + let left = self + .transcoder + .encode_comparable(left.0, *left.1) + .expect("transcoder must return Some variant"); + let right = self + .transcoder + .encode_comparable(right.0, *right.1) + .expect("transcoder must return Some variant"); + debug!(left=?left, right=?right, encoding=?ENCODING_NAME, "row_ids_filter_range encoded expr"); match (&left.1, &right.1) { (cmp::Operator::GT, cmp::Operator::LT) @@ -425,8 +433,8 @@ where | (cmp::Operator::LT, cmp::Operator::GTE) | (cmp::Operator::LTE, cmp::Operator::GT) | (cmp::Operator::LTE, cmp::Operator::GTE) => self.row_ids_cmp_range_order( - (&left.0, Self::ord_from_op(left.1)), - (&right.0, Self::ord_from_op(right.1)), + (&left.0, Self::ord_from_op(&left.1)), + (&right.0, Self::ord_from_op(&right.1)), dst, ), diff --git a/read_buffer/src/column/encoding/scalar/fixed_null.rs b/read_buffer/src/column/encoding/scalar/fixed_null.rs index 85cb3d6fb2..c0e8c4955a 100644 --- a/read_buffer/src/column/encoding/scalar/fixed_null.rs +++ b/read_buffer/src/column/encoding/scalar/fixed_null.rs @@ -486,8 +486,16 @@ where right: (L, &cmp::Operator), dst: RowIDs, ) -> RowIDs { - let left = (self.transcoder.encode(left.0), left.1); - let right = (self.transcoder.encode(right.0), right.1); + debug!(left=?left, right=?right, encoding=?ENCODING_NAME, "row_ids_filter_range"); + let left = self + .transcoder + .encode_comparable(left.0, *left.1) + .expect("transcoder must return Some variant"); + let right = self + .transcoder + .encode_comparable(right.0, *right.1) + .expect("transcoder must return Some variant"); + debug!(left=?left, right=?right, encoding=?ENCODING_NAME, "row_ids_filter_range encoded expr"); match (left.1, right.1) { (cmp::Operator::GT, cmp::Operator::LT) @@ -498,8 +506,8 @@ where | (cmp::Operator::LT, cmp::Operator::GTE) | (cmp::Operator::LTE, cmp::Operator::GT) | (cmp::Operator::LTE, cmp::Operator::GTE) => self.row_ids_cmp_range_order( - (left.0, Self::ord_from_op(left.1)), - (right.0, Self::ord_from_op(right.1)), + (left.0, Self::ord_from_op(&left.1)), + (right.0, Self::ord_from_op(&right.1)), dst, ), diff --git a/read_buffer/src/column/encoding/scalar/rle.rs b/read_buffer/src/column/encoding/scalar/rle.rs index dbe1f44c3c..80d81e777f 100644 --- a/read_buffer/src/column/encoding/scalar/rle.rs +++ b/read_buffer/src/column/encoding/scalar/rle.rs @@ -430,8 +430,16 @@ where right: (L, &cmp::Operator), dst: RowIDs, ) -> RowIDs { - let left = (self.transcoder.encode(left.0), left.1); - let right = (self.transcoder.encode(right.0), right.1); + debug!(left=?left, right=?right, encoding=?ENCODING_NAME, "row_ids_filter_range"); + let left = self + .transcoder + .encode_comparable(left.0, *left.1) + .expect("transcoder must return Some variant"); + let right = self + .transcoder + .encode_comparable(right.0, *right.1) + .expect("transcoder must return Some variant"); + debug!(left=?left, right=?right, encoding=?ENCODING_NAME, "row_ids_filter_range encoded expr"); match (&left.1, &right.1) { (cmp::Operator::GT, cmp::Operator::LT) @@ -442,8 +450,8 @@ where | (cmp::Operator::LT, cmp::Operator::GTE) | (cmp::Operator::LTE, cmp::Operator::GT) | (cmp::Operator::LTE, cmp::Operator::GTE) => self.row_ids_cmp_range( - (&left.0, Self::ord_from_op(left.1)), - (&right.0, Self::ord_from_op(right.1)), + (&left.0, Self::ord_from_op(&left.1)), + (&right.0, Self::ord_from_op(&right.1)), dst, ), diff --git a/read_buffer/src/column/float.rs b/read_buffer/src/column/float.rs index 4240e8481d..d0755115f4 100644 --- a/read_buffer/src/column/float.rs +++ b/read_buffer/src/column/float.rs @@ -787,8 +787,6 @@ mod test { } fn _row_ids_filter_float_trimmer(enc: Box>) { - // These cases replicate the behaviour in PG. - // // [100.0, 200.0, 100.0, 300.0, 400.0] let cases = vec![ (100.0, Operator::Equal, vec![0, 2]), // 100.0, 100.0 @@ -877,8 +875,6 @@ mod test { } fn _row_ids_filter_float_trimmer_with_nulls(enc: Box>) { - // These cases replicate the behaviour in PG. - // // [100.0, NULL, NULL, 200.0] let cases = vec![ (100.0, Operator::Equal, vec![0]), // 100.0 @@ -932,4 +928,129 @@ mod test { ); } } + + #[test] + fn row_ids_filter_range_float_trimmer() { + let data = vec![100.0, 200.0, 100.0, 300.0, 400.0]; + + let float_trimmer = FloatByteTrimmer {}; + let data_float_trimmed = data + .iter() + .cloned() + .map::(|x| float_trimmer.encode(x)) + .collect::>(); + + let cases: Vec>> = vec![ + Box::new(RLE::::new_from_iter( + data_float_trimmed.iter().cloned(), + float_trimmer, + )), + Box::new(Fixed::::new( + data_float_trimmed.clone(), + FloatByteTrimmer {}, + )), + Box::new(FixedNull::::new( + PrimitiveArray::from(data_float_trimmed), + FloatByteTrimmer {}, + )), + ]; + + for enc in cases { + _row_ids_filter_range_float_trimmer(enc) + } + } + + fn _row_ids_filter_range_float_trimmer(enc: Box>) { + // [100.0, 200.0, 100.0, 300.0, 400.0] + let cases = vec![ + ((100.0, &Operator::LT), (99.0, &Operator::GT), vec![]), // + ((100.0, &Operator::LTE), (100.0, &Operator::GTE), vec![0, 2]), // 100.0, 100.0 + ( + (100.0, &Operator::GT), + (400.0, &Operator::LTE), + vec![1, 3, 4], + ), // 200.0, 300.0, 400.0 + ( + (100.0, &Operator::GTE), + (401.0, &Operator::LTE), + vec![0, 1, 2, 3, 4], + ), // 100.0, 200.0, 100.0, 300.0, 400.0 + ((200.0, &Operator::LT), (99.6, &Operator::GT), vec![0, 2]), // 100.0, 100.0 + ((200.0, &Operator::GT), (401.2, &Operator::LTE), vec![3, 4]), // 300.0, 400.0 + ( + (200.0, &Operator::GTE), + (400.9, &Operator::LT), + vec![1, 3, 4], + ), // 200.0, 300.0, 400.0 + ( + (99.8, &Operator::GT), + (500.87, &Operator::LT), + vec![0, 1, 2, 3, 4], + ), // 100.0, 200.0, 100.0, 300.0, 400.0 + ]; + + for (left, right, exp) in cases { + let dst = enc.row_ids_filter_range(left, right, RowIDs::new_vector()); + assert_eq!( + dst.unwrap_vector(), + &exp, + "example '{:?} {:?}' failed for {:?}", + left, + right, + enc.name(), + ); + } + } + + #[test] + fn row_ids_filter_range_float_trimmer_with_nulls() { + let data = vec![Some(100.0), None, None, Some(200.0), None]; + + let float_trimmer = FloatByteTrimmer {}; + + let cases: Vec>> = vec![ + Box::new(RLE::::new_from_iter_opt( + data.iter() + .cloned() + .map(|x| x.map(|v| float_trimmer.encode(v))), + FloatByteTrimmer {}, + )), + Box::new(FixedNull::::new( + data.iter() + .cloned() + .map(|v| v.map(|v| float_trimmer.encode(v))) + .collect(), + FloatByteTrimmer {}, + )), + ]; + + for enc in cases { + _row_ids_filter_range_float_trimmer_with_nulls(enc) + } + } + + fn _row_ids_filter_range_float_trimmer_with_nulls(enc: Box>) { + // [100.0, NULL, NULL, 200.0, NULL] + let cases = vec![ + ((100.0, &Operator::LT), (99.0, &Operator::GT), vec![]), // + ((100.0, &Operator::LTE), (100.0, &Operator::GTE), vec![0]), // 100.0 + ((100.0, &Operator::GT), (400.0, &Operator::LTE), vec![3]), // 200.0 + ((100.0, &Operator::GTE), (401.0, &Operator::LTE), vec![0, 3]), // 100.0, 200.0 + ((200.0, &Operator::LT), (99.6, &Operator::GT), vec![0]), // 100.0 + ((200.0, &Operator::GT), (401.2, &Operator::LTE), vec![]), // + ((99.8, &Operator::GT), (500.87, &Operator::LT), vec![0, 3]), // 100.0, 200.0 + ]; + + for (left, right, exp) in cases { + let dst = enc.row_ids_filter_range(left, right, RowIDs::new_vector()); + assert_eq!( + dst.unwrap_vector(), + &exp, + "example '{:?} {:?}' failed for {:?}", + left, + right, + enc.name(), + ); + } + } } From f7228ddd602a79d39fefae2b511e4405cf5a3a52 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 15 Sep 2021 22:04:10 +0100 Subject: [PATCH 7/8] test: add test for byte trimmed floats --- read_buffer/src/column.rs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/read_buffer/src/column.rs b/read_buffer/src/column.rs index 6a565e0031..85349b917c 100644 --- a/read_buffer/src/column.rs +++ b/read_buffer/src/column.rs @@ -2067,6 +2067,40 @@ mod test { assert!(matches!(row_ids, RowIDsOption::All(_))); } + #[test] + fn row_ids_filter_float_trimmed() { + let input = &[100.0, 200.0, 300.0, 2.0, 22.0, 30.0]; + + let col = Column::from(&input[..]); + let mut row_ids = col.row_ids_filter( + &cmp::Operator::Equal, + &Value::from(200.0), + RowIDs::new_bitmap(), + ); + assert_eq!(row_ids.unwrap().to_vec(), vec![1]); + + row_ids = col.row_ids_filter( + &cmp::Operator::LT, + &Value::from(64000.0), + RowIDs::new_bitmap(), + ); + assert!(matches!(row_ids, RowIDsOption::All(_))); + + row_ids = col.row_ids_filter( + &cmp::Operator::GTE, + &Value::from(-1_000_000.0), + RowIDs::new_bitmap(), + ); + assert!(matches!(row_ids, RowIDsOption::All(_))); + + row_ids = col.row_ids_filter( + &cmp::Operator::NotEqual, + &Value::from(1_000_000.3), + RowIDs::new_bitmap(), + ); + assert!(matches!(row_ids, RowIDsOption::All(_))); + } + #[test] fn row_ids_range() { let input = &[100_i64, 200, 300, 2, 200, 22, 30]; From e51dd0365a573ed618b318342c1daf8232d30d1b Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 16 Sep 2021 09:28:18 +0100 Subject: [PATCH 8/8] refactor: PR feedback Co-authored-by: Marko Mikulicic --- .../src/column/encoding/scalar/fixed_null.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/read_buffer/src/column/encoding/scalar/fixed_null.rs b/read_buffer/src/column/encoding/scalar/fixed_null.rs index c0e8c4955a..ed7a2e73bd 100644 --- a/read_buffer/src/column/encoding/scalar/fixed_null.rs +++ b/read_buffer/src/column/encoding/scalar/fixed_null.rs @@ -241,14 +241,14 @@ where let mut found = false; let mut count = 0; for i in 0..self.num_rows() as usize { - if self.arr.is_null(i) && found { - // add the non-null range - let (min, max) = (i as u32 - count, i as u32); - dst.add_range(min, max); - found = false; - count = 0; - continue; - } else if self.arr.is_null(i) { + if self.arr.is_null(i) { + if found { + // add the non-null range + let (min, max) = (i as u32 - count, i as u32); + dst.add_range(min, max); + found = false; + count = 0; + } continue; }