fix: ensure float byte trimmed predicate pushdown works for unencodable values
parent
483508e3c6
commit
1a70865a03
|
@ -7,6 +7,7 @@
|
|||
//! allow results to be emitted as some logical type `L` via a transformation
|
||||
//! `T`.
|
||||
use either::Either;
|
||||
use observability_deps::tracing::debug;
|
||||
use std::cmp::Ordering;
|
||||
use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
|
@ -373,14 +374,36 @@ where
|
|||
Some(self.transcoder.decode(max))
|
||||
}
|
||||
|
||||
fn row_ids_filter(&self, value: L, op: &cmp::Operator, dst: RowIDs) -> RowIDs {
|
||||
let value = self.transcoder.encode(value);
|
||||
fn row_ids_filter(&self, value: L, op: &cmp::Operator, mut dst: RowIDs) -> RowIDs {
|
||||
debug!(value=?value, operator=?op, encoding=?ENCODING_NAME, "row_ids_filter");
|
||||
let (value, op) = match self.transcoder.encode_comparable(value, *op) {
|
||||
Some((value, op)) => (value, op),
|
||||
None => {
|
||||
// The value is not encodable. This can happen with the == or !=
|
||||
// operator. In the case of ==, no values in the encoding could
|
||||
// possible satisfy the expression. In the case of !=, all
|
||||
// values would satisfy the expression.
|
||||
dst.clear();
|
||||
return match op {
|
||||
cmp::Operator::Equal => dst,
|
||||
cmp::Operator::NotEqual => {
|
||||
dst.add_range(0, self.num_rows());
|
||||
dst
|
||||
}
|
||||
op => panic!("operator {:?} not expected", op),
|
||||
};
|
||||
}
|
||||
};
|
||||
debug!(value=?value, operator=?op, encoding=?ENCODING_NAME, "row_ids_filter encoded expr");
|
||||
|
||||
// N.B, the transcoder may have changed the operator depending on the
|
||||
// value provided.
|
||||
match op {
|
||||
cmp::Operator::GT => self.row_ids_cmp_order(&value, PartialOrd::gt, dst),
|
||||
cmp::Operator::GTE => self.row_ids_cmp_order(&value, PartialOrd::ge, dst),
|
||||
cmp::Operator::LT => self.row_ids_cmp_order(&value, PartialOrd::lt, dst),
|
||||
cmp::Operator::LTE => self.row_ids_cmp_order(&value, PartialOrd::le, dst),
|
||||
_ => self.row_ids_equal(&value, op, dst),
|
||||
_ => self.row_ids_equal(&value, &op, dst),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -233,6 +233,11 @@ where
|
|||
fn all_non_null_row_ids(&self, mut dst: RowIDs) -> RowIDs {
|
||||
dst.clear();
|
||||
|
||||
if self.null_count() == 0 {
|
||||
dst.add_range(0, self.num_rows());
|
||||
return dst;
|
||||
}
|
||||
|
||||
let mut found = false;
|
||||
let mut count = 0;
|
||||
for i in 0..self.num_rows() as usize {
|
||||
|
@ -444,14 +449,34 @@ where
|
|||
max.map(|v| self.transcoder.decode(v))
|
||||
}
|
||||
|
||||
fn row_ids_filter(&self, value: L, op: &cmp::Operator, dst: RowIDs) -> RowIDs {
|
||||
let value = self.transcoder.encode(value);
|
||||
fn row_ids_filter(&self, value: L, op: &cmp::Operator, mut dst: RowIDs) -> RowIDs {
|
||||
debug!(value=?value, operator=?op, encoding=?ENCODING_NAME, "row_ids_filter");
|
||||
let (value, op) = match self.transcoder.encode_comparable(value, *op) {
|
||||
Some((value, op)) => (value, op),
|
||||
None => {
|
||||
// The value is not encodable. This can happen with the == or !=
|
||||
// operator. In the case of ==, no values in the encoding could
|
||||
// possible satisfy the expression. In the case of !=, all
|
||||
// non-null values would satisfy the expression.
|
||||
dst.clear();
|
||||
return match op {
|
||||
cmp::Operator::Equal => dst,
|
||||
cmp::Operator::NotEqual => {
|
||||
dst = self.all_non_null_row_ids(dst);
|
||||
dst
|
||||
}
|
||||
op => panic!("operator {:?} not expected", op),
|
||||
};
|
||||
}
|
||||
};
|
||||
debug!(value=?value, operator=?op, encoding=?ENCODING_NAME, "row_ids_filter encoded expr");
|
||||
|
||||
match op {
|
||||
cmp::Operator::GT => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst),
|
||||
cmp::Operator::GTE => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst),
|
||||
cmp::Operator::LT => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst),
|
||||
cmp::Operator::LTE => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst),
|
||||
_ => self.row_ids_equal(value, op, dst),
|
||||
cmp::Operator::GT => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst),
|
||||
cmp::Operator::GTE => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst),
|
||||
cmp::Operator::LT => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst),
|
||||
cmp::Operator::LTE => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst),
|
||||
_ => self.row_ids_equal(value, &op, dst),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -832,7 +857,7 @@ mod test {
|
|||
(vec![Some(22), None], vec![0]),
|
||||
(
|
||||
vec![Some(22), None, Some(1), None, Some(3), None],
|
||||
vec![0, 3, 4],
|
||||
vec![0, 2, 4],
|
||||
),
|
||||
(vec![Some(22), None, None, Some(33)], vec![0, 3]),
|
||||
(vec![None, None, Some(33)], vec![2]),
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use either::Either;
|
||||
use observability_deps::tracing::debug;
|
||||
|
||||
use crate::column::cmp;
|
||||
use crate::column::RowIDs;
|
||||
|
@ -394,15 +395,34 @@ where
|
|||
self.num_rows
|
||||
}
|
||||
|
||||
fn row_ids_filter(&self, value: L, op: &cmp::Operator, dst: RowIDs) -> RowIDs {
|
||||
let value = self.transcoder.encode(value);
|
||||
fn row_ids_filter(&self, value: L, op: &cmp::Operator, mut dst: RowIDs) -> RowIDs {
|
||||
debug!(value=?value, operator=?op, encoding=?ENCODING_NAME, "row_ids_filter");
|
||||
let (value, op) = match self.transcoder.encode_comparable(value, *op) {
|
||||
Some((value, op)) => (value, op),
|
||||
None => {
|
||||
// The value is not encodable. This can happen with the == or !=
|
||||
// operator. In the case of ==, no values in the encoding could
|
||||
// possible satisfy the expression. In the case of !=, all
|
||||
// non-null values would satisfy the expression.
|
||||
dst.clear();
|
||||
return match op {
|
||||
cmp::Operator::Equal => dst,
|
||||
cmp::Operator::NotEqual => {
|
||||
dst = self.all_non_null_row_ids(dst);
|
||||
dst
|
||||
}
|
||||
op => panic!("operator {:?} not expected", op),
|
||||
};
|
||||
}
|
||||
};
|
||||
debug!(value=?value, operator=?op, encoding=?ENCODING_NAME, "row_ids_filter encoded expr");
|
||||
|
||||
match op {
|
||||
cmp::Operator::Equal | cmp::Operator::NotEqual => {
|
||||
self.row_ids_cmp_equal(value, op, dst)
|
||||
}
|
||||
cmp::Operator::LT | cmp::Operator::LTE | cmp::Operator::GT | cmp::Operator::GTE => {
|
||||
self.row_ids_cmp(value, op, dst)
|
||||
}
|
||||
cmp::Operator::GT => self.row_ids_cmp(value, &op, dst),
|
||||
cmp::Operator::GTE => self.row_ids_cmp(value, &op, dst),
|
||||
cmp::Operator::LT => self.row_ids_cmp(value, &op, dst),
|
||||
cmp::Operator::LTE => self.row_ids_cmp(value, &op, dst),
|
||||
_ => self.row_ids_cmp_equal(value, &op, dst),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use crate::column::cmp::Operator;
|
||||
use std::{
|
||||
convert::TryFrom,
|
||||
fmt::{Debug, Display},
|
||||
|
@ -13,7 +14,17 @@ use std::{
|
|||
// `P` is a physical type that is stored directly within an encoding, `L` is
|
||||
// a logical type callers expect to be returned.
|
||||
pub trait Transcoder<P, L>: Debug + Display {
|
||||
/// A function that encodes a logical value into a physical representation.
|
||||
fn encode(&self, _: L) -> P;
|
||||
|
||||
/// A function that attempts to encode a logical value, within the context
|
||||
/// of a comparison operator, into a physical representation.
|
||||
///
|
||||
/// Implementation should return a suitable operator for the physical
|
||||
/// representation, which may differ from the provided operator.
|
||||
fn encode_comparable(&self, _: L, _: Operator) -> Option<(P, Operator)>;
|
||||
|
||||
/// A function to decode a physical representation back into a logical value.
|
||||
fn decode(&self, _: P) -> L;
|
||||
}
|
||||
|
||||
|
@ -29,6 +40,10 @@ impl<T> Transcoder<T, T> for NoOpTranscoder {
|
|||
v
|
||||
}
|
||||
|
||||
fn encode_comparable(&self, v: T, op: Operator) -> Option<(T, Operator)> {
|
||||
Some((v, op))
|
||||
}
|
||||
|
||||
fn decode(&self, v: T) -> T {
|
||||
v
|
||||
}
|
||||
|
@ -56,13 +71,17 @@ pub struct ByteTrimmer {}
|
|||
impl<P, L> Transcoder<P, L> for ByteTrimmer
|
||||
where
|
||||
L: From<P>,
|
||||
P: TryFrom<L>,
|
||||
P: TryFrom<L> + PartialEq + PartialOrd,
|
||||
<P as TryFrom<L>>::Error: std::fmt::Debug,
|
||||
{
|
||||
fn encode(&self, v: L) -> P {
|
||||
P::try_from(v).unwrap()
|
||||
}
|
||||
|
||||
fn encode_comparable(&self, v: L, op: Operator) -> Option<(P, Operator)> {
|
||||
P::try_from(v).ok().map(|p| (p, op))
|
||||
}
|
||||
|
||||
fn decode(&self, v: P) -> L {
|
||||
L::from(v)
|
||||
}
|
||||
|
@ -91,6 +110,54 @@ macro_rules! make_float_trimmer {
|
|||
v as $type
|
||||
}
|
||||
|
||||
fn encode_comparable(&self, v: f64, op: Operator) -> Option<($type, Operator)> {
|
||||
assert!(v <= <$type>::MAX as f64);
|
||||
if v == ((v as $type) as f64) {
|
||||
return Some((v as $type, op));
|
||||
}
|
||||
|
||||
match op {
|
||||
Operator::Equal => {
|
||||
None // no encoded values will == v
|
||||
}
|
||||
Operator::NotEqual => {
|
||||
None // all encoded values will != v
|
||||
}
|
||||
Operator::LT => {
|
||||
// convert to next highest encodable value. For example
|
||||
// given '< 23.2` return 24.0 encoded as the physical
|
||||
// type. < 23.2 is logically equivalent to < 24.0 since
|
||||
// there are no valid values in the domain (23.2, 24.0).
|
||||
Some((v.ceil() as $type, op))
|
||||
}
|
||||
Operator::LTE => {
|
||||
// convert to next highest encodable value and change
|
||||
// operator to <.
|
||||
// For example given '<= 23.2` return 24.0 encoded as
|
||||
// the physical type. <= 23.2 is logically equivalent
|
||||
// to < 24.0 since there are no valid values in the
|
||||
// domain [23.2, 24.0).
|
||||
Some((v.ceil() as $type, Operator::LT))
|
||||
}
|
||||
Operator::GT => {
|
||||
// convert to next lowest encodable value. For example
|
||||
// given '> 23.2` return 23.0 encoded as the physical
|
||||
// type. > 23.2 is logically equivalent to > 23.0 since
|
||||
// there are no valid values in the domain (23.0, 23.2].
|
||||
Some((v.floor() as $type, op))
|
||||
}
|
||||
Operator::GTE => {
|
||||
// convert to next lowest encodable value and change
|
||||
// operator to >.
|
||||
// For example given '>= 23.2` return 23.0 encoded as
|
||||
// the physical type. >= 23.2 is logically equivalent
|
||||
// to > 24.0 since there are no valid values in the
|
||||
// domain [23.2, 24.0).
|
||||
Some((v.floor() as $type, Operator::GT))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn decode(&self, v: $type) -> f64 {
|
||||
v.into()
|
||||
}
|
||||
|
@ -119,7 +186,7 @@ impl Display for FloatByteTrimmer {
|
|||
// result.
|
||||
|
||||
#[cfg(test)]
|
||||
use std::{sync::atomic::AtomicUsize, sync::atomic::Ordering, sync::Arc};
|
||||
use std::{sync::atomic, sync::atomic::AtomicUsize, sync::Arc};
|
||||
#[cfg(test)]
|
||||
/// A mock implementation of Transcoder that tracks calls to encode and decode.
|
||||
/// This is useful for testing encoder implementations.
|
||||
|
@ -127,6 +194,7 @@ use std::{sync::atomic::AtomicUsize, sync::atomic::Ordering, sync::Arc};
|
|||
pub struct MockTranscoder {
|
||||
encoding_calls: AtomicUsize,
|
||||
decoding_calls: AtomicUsize,
|
||||
partial_cmp_calls: AtomicUsize,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -135,6 +203,7 @@ impl Default for MockTranscoder {
|
|||
Self {
|
||||
encoding_calls: AtomicUsize::default(),
|
||||
decoding_calls: AtomicUsize::default(),
|
||||
partial_cmp_calls: AtomicUsize::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -142,23 +211,28 @@ impl Default for MockTranscoder {
|
|||
#[cfg(test)]
|
||||
impl MockTranscoder {
|
||||
pub fn encodings(&self) -> usize {
|
||||
self.encoding_calls.load(Ordering::Relaxed)
|
||||
self.encoding_calls.load(atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn decodings(&self) -> usize {
|
||||
self.decoding_calls.load(Ordering::Relaxed)
|
||||
self.decoding_calls.load(atomic::Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl<T> Transcoder<T, T> for MockTranscoder {
|
||||
fn encode(&self, v: T) -> T {
|
||||
self.encoding_calls.fetch_add(1, Ordering::Relaxed);
|
||||
self.encoding_calls.fetch_add(1, atomic::Ordering::Relaxed);
|
||||
v
|
||||
}
|
||||
|
||||
fn encode_comparable(&self, v: T, op: Operator) -> Option<(T, Operator)> {
|
||||
self.encoding_calls.fetch_add(1, atomic::Ordering::Relaxed);
|
||||
Some((v, op))
|
||||
}
|
||||
|
||||
fn decode(&self, v: T) -> T {
|
||||
self.decoding_calls.fetch_add(1, Ordering::Relaxed);
|
||||
self.decoding_calls.fetch_add(1, atomic::Ordering::Relaxed);
|
||||
v
|
||||
}
|
||||
}
|
||||
|
@ -166,12 +240,17 @@ impl<T> Transcoder<T, T> for MockTranscoder {
|
|||
#[cfg(test)]
|
||||
impl<T> Transcoder<T, T> for Arc<MockTranscoder> {
|
||||
fn encode(&self, v: T) -> T {
|
||||
self.encoding_calls.fetch_add(1, Ordering::Relaxed);
|
||||
self.encoding_calls.fetch_add(1, atomic::Ordering::Relaxed);
|
||||
v
|
||||
}
|
||||
|
||||
fn encode_comparable(&self, v: T, op: Operator) -> Option<(T, Operator)> {
|
||||
self.encoding_calls.fetch_add(1, atomic::Ordering::Relaxed);
|
||||
Some((v, op))
|
||||
}
|
||||
|
||||
fn decode(&self, v: T) -> T {
|
||||
self.decoding_calls.fetch_add(1, Ordering::Relaxed);
|
||||
self.decoding_calls.fetch_add(1, atomic::Ordering::Relaxed);
|
||||
v
|
||||
}
|
||||
}
|
||||
|
|
|
@ -767,13 +767,13 @@ mod test {
|
|||
.collect::<Vec<u16>>();
|
||||
|
||||
let cases: Vec<Box<dyn ScalarEncoding<f64>>> = vec![
|
||||
Box::new(RLE::<_, _, _>::new_from_iter(
|
||||
data.iter().cloned(),
|
||||
NoOpTranscoder {},
|
||||
Box::new(RLE::<u16, f64, _>::new_from_iter(
|
||||
data_float_trimmed.iter().cloned(),
|
||||
float_trimmer,
|
||||
)),
|
||||
Box::new(Fixed::<u16, f64, _>::new(
|
||||
data_float_trimmed.clone(),
|
||||
float_trimmer,
|
||||
FloatByteTrimmer {},
|
||||
)),
|
||||
Box::new(FixedNull::<UInt16Type, f64, _>::new(
|
||||
PrimitiveArray::from(data_float_trimmed),
|
||||
|
@ -838,7 +838,98 @@ mod test {
|
|||
|
||||
for (v, op, exp) in cases {
|
||||
let dst = enc.row_ids_filter(v, &op, RowIDs::new_vector());
|
||||
assert_eq!(dst.unwrap_vector(), &exp, "example '{} {:?}' failed", op, v);
|
||||
assert_eq!(
|
||||
dst.unwrap_vector(),
|
||||
&exp,
|
||||
"example '{} {:?}' failed for {:?}",
|
||||
op,
|
||||
v,
|
||||
enc.name()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn row_ids_filter_float_trimmer_with_nulls() {
|
||||
let data = vec![Some(100.0), None, None, Some(200.0), None];
|
||||
|
||||
let float_trimmer = FloatByteTrimmer {};
|
||||
|
||||
let cases: Vec<Box<dyn ScalarEncoding<f64>>> = vec![
|
||||
Box::new(RLE::<u16, f64, _>::new_from_iter_opt(
|
||||
data.iter()
|
||||
.cloned()
|
||||
.map(|x| x.map(|v| float_trimmer.encode(v))),
|
||||
FloatByteTrimmer {},
|
||||
)),
|
||||
Box::new(FixedNull::<UInt16Type, f64, _>::new(
|
||||
data.iter()
|
||||
.cloned()
|
||||
.map(|v| v.map(|v| float_trimmer.encode(v)))
|
||||
.collect(),
|
||||
FloatByteTrimmer {},
|
||||
)),
|
||||
];
|
||||
|
||||
for enc in cases {
|
||||
_row_ids_filter_float_trimmer_with_nulls(enc)
|
||||
}
|
||||
}
|
||||
|
||||
fn _row_ids_filter_float_trimmer_with_nulls(enc: Box<dyn ScalarEncoding<f64>>) {
|
||||
// These cases replicate the behaviour in PG.
|
||||
//
|
||||
// [100.0, NULL, NULL, 200.0]
|
||||
let cases = vec![
|
||||
(100.0, Operator::Equal, vec![0]), // 100.0
|
||||
(100.0, Operator::NotEqual, vec![3]), // 200.0
|
||||
(100.0, Operator::LT, vec![]), //
|
||||
(100.0, Operator::LTE, vec![0]), // 100.0
|
||||
(100.0, Operator::GT, vec![3]), // 200.0
|
||||
(100.0, Operator::GTE, vec![0, 3]), // 100.0, 200.0
|
||||
(200.0, Operator::Equal, vec![3]), // 200.0
|
||||
(200.0, Operator::NotEqual, vec![0]), // 100.0
|
||||
(200.0, Operator::LT, vec![0]), // 100.0
|
||||
(200.0, Operator::LTE, vec![0, 3]), // 100.0, 200.0
|
||||
(200.0, Operator::GT, vec![]), //
|
||||
(200.0, Operator::GTE, vec![3]), // 200.0
|
||||
// Values not present in the column
|
||||
(99.0, Operator::Equal, vec![]), //
|
||||
(99.0, Operator::NotEqual, vec![0, 3]), // 100.0, 200.0
|
||||
(99.0, Operator::LT, vec![]), //
|
||||
(99.0, Operator::LTE, vec![]), //
|
||||
(99.0, Operator::GT, vec![0, 3]), // 100.0, 200.0
|
||||
(99.0, Operator::GTE, vec![0, 3]), // 100.0, 200.0
|
||||
(200.4, Operator::Equal, vec![]), //
|
||||
(200.4, Operator::NotEqual, vec![0, 3]), // 100.0, 200.0
|
||||
(200.4, Operator::LT, vec![0, 3]), // 100.0,200.0
|
||||
(200.4, Operator::LTE, vec![0, 3]), // 100.0, 200.0
|
||||
(200.4, Operator::GT, vec![]), //
|
||||
(200.4, Operator::GTE, vec![]), //
|
||||
(201.0, Operator::Equal, vec![]), //
|
||||
(201.0, Operator::NotEqual, vec![0, 3]), // 100.0, 200.0
|
||||
(201.0, Operator::LT, vec![0, 3]), // 100.0, 200.0
|
||||
(201.0, Operator::LTE, vec![0, 3]), // 100.0, 200.0
|
||||
(201.0, Operator::GT, vec![]), //
|
||||
(201.0, Operator::GTE, vec![]), //
|
||||
(401.0, Operator::Equal, vec![]), //
|
||||
(401.0, Operator::NotEqual, vec![0, 3]), // 100.0, 200.0
|
||||
(401.0, Operator::LT, vec![0, 3]), // 100.0, 200.0
|
||||
(401.0, Operator::LTE, vec![0, 3]), // 100.0, 200.0
|
||||
(401.0, Operator::GT, vec![]), //
|
||||
(401.0, Operator::GTE, vec![]), //
|
||||
];
|
||||
|
||||
for (v, op, exp) in cases {
|
||||
let dst = enc.row_ids_filter(v, &op, RowIDs::new_vector());
|
||||
assert_eq!(
|
||||
dst.unwrap_vector(),
|
||||
&exp,
|
||||
"example '{} {:?}' failed for {:?}",
|
||||
op,
|
||||
v,
|
||||
enc.name()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue