Merge pull request #1493 from influxdata/er/feat/read_buffer/num_rle
feat: implement Read Buffer run-length encoding for scalarspull/24376/head
commit
02ae69dffc
|
@ -15,7 +15,7 @@ use arrow::array::Array;
|
|||
use crate::schema::LogicalDataType;
|
||||
use crate::value::{EncodedValues, OwnedValue, Scalar, Value, Values};
|
||||
use boolean::BooleanEncoding;
|
||||
use encoding::{bool, scalar};
|
||||
use encoding::bool;
|
||||
use float::FloatEncoding;
|
||||
use integer::IntegerEncoding;
|
||||
use string::StringEncoding;
|
||||
|
@ -1088,13 +1088,13 @@ impl From<arrow::array::Float64Array> for Column {
|
|||
_ => unreachable!("min/max must both be Some or None"),
|
||||
};
|
||||
|
||||
let data = scalar::FixedNull::<arrow::datatypes::Float64Type>::from(arr);
|
||||
let data = FloatEncoding::from(arr);
|
||||
let meta = MetaData {
|
||||
range,
|
||||
..MetaData::default()
|
||||
};
|
||||
|
||||
Self::Float(meta, FloatEncoding::FixedNull64(data))
|
||||
Self::Float(meta, data)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1285,6 +1285,7 @@ impl RowIDs {
|
|||
}
|
||||
}
|
||||
|
||||
// Add all row IDs in the domain `[from, to)` to the collection.
|
||||
pub fn add_range(&mut self, from: u32, to: u32) {
|
||||
match self {
|
||||
Self::Bitmap(ids) => ids.add_range(from as u64..to as u64),
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::convert::TryFrom;
|
||||
use std::{convert::TryFrom, fmt::Display};
|
||||
|
||||
/// Possible comparison operators
|
||||
#[derive(Debug, PartialEq, Copy, Clone)]
|
||||
|
@ -12,6 +12,23 @@ pub enum Operator {
|
|||
LTE,
|
||||
}
|
||||
|
||||
impl Display for Operator {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{}",
|
||||
match self {
|
||||
Self::Equal => "=",
|
||||
Self::NotEqual => "!=",
|
||||
Self::GT => ">",
|
||||
Self::GTE => ">=",
|
||||
Self::LT => "<",
|
||||
Self::LTE => "<=",
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for Operator {
|
||||
type Error = String;
|
||||
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
pub mod fixed;
|
||||
pub mod fixed_null;
|
||||
pub mod rle;
|
||||
|
||||
pub use fixed::Fixed;
|
||||
pub use fixed_null::FixedNull;
|
||||
pub use rle::RLE;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,14 +1,25 @@
|
|||
use std::mem::size_of;
|
||||
|
||||
use arrow::{self, array::Array};
|
||||
use std::iter::FromIterator;
|
||||
use std::{cmp::Ordering, mem::size_of};
|
||||
|
||||
use super::encoding::{scalar::Fixed, scalar::FixedNull};
|
||||
use super::encoding::{
|
||||
scalar::Fixed,
|
||||
scalar::{rle::RLE, FixedNull},
|
||||
};
|
||||
use super::{cmp, Statistics};
|
||||
use crate::column::{RowIDs, Scalar, Value, Values};
|
||||
|
||||
#[allow(clippy::upper_case_acronyms)] // TODO(edd): these will be OK in 1.52
|
||||
#[derive(Debug)]
|
||||
pub enum FloatEncoding {
|
||||
// A fixed-width "no compression" vector of non-nullable values
|
||||
Fixed64(Fixed<f64>),
|
||||
|
||||
// A fixed-width "no compression" vector of nullable values (as Arrow array)
|
||||
FixedNull64(FixedNull<arrow::datatypes::Float64Type>),
|
||||
|
||||
// A RLE compressed encoding of nullable values.
|
||||
RLE64(RLE<f64>),
|
||||
}
|
||||
|
||||
impl FloatEncoding {
|
||||
|
@ -17,6 +28,7 @@ impl FloatEncoding {
|
|||
match self {
|
||||
Self::Fixed64(enc) => enc.size(),
|
||||
Self::FixedNull64(enc) => enc.size(),
|
||||
Self::RLE64(enc) => enc.size(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -30,6 +42,7 @@ impl FloatEncoding {
|
|||
size_of::<Vec<f64>>() + (enc.num_rows() as usize * size_of::<f64>())
|
||||
}
|
||||
Self::FixedNull64(enc) => enc.size_raw(include_nulls),
|
||||
Self::RLE64(enc) => enc.size_raw(include_nulls),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -38,6 +51,7 @@ impl FloatEncoding {
|
|||
match self {
|
||||
Self::Fixed64(enc) => enc.num_rows(),
|
||||
Self::FixedNull64(enc) => enc.num_rows(),
|
||||
Self::RLE64(enc) => enc.num_rows(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -59,6 +73,7 @@ impl FloatEncoding {
|
|||
match self {
|
||||
Self::Fixed64(_) => false,
|
||||
Self::FixedNull64(enc) => enc.contains_null(),
|
||||
Self::RLE64(enc) => enc.contains_null(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -67,6 +82,7 @@ impl FloatEncoding {
|
|||
match self {
|
||||
Self::Fixed64(_) => 0,
|
||||
Self::FixedNull64(enc) => enc.null_count(),
|
||||
Self::RLE64(enc) => enc.null_count(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -75,6 +91,7 @@ impl FloatEncoding {
|
|||
match self {
|
||||
Self::Fixed64(_) => true,
|
||||
Self::FixedNull64(enc) => enc.has_any_non_null_value(),
|
||||
Self::RLE64(enc) => enc.has_any_non_null_value(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -84,6 +101,7 @@ impl FloatEncoding {
|
|||
match self {
|
||||
Self::Fixed64(_) => !row_ids.is_empty(), // all rows will be non-null
|
||||
Self::FixedNull64(enc) => enc.has_non_null_value(row_ids),
|
||||
Self::RLE64(enc) => enc.has_non_null_value(row_ids),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -95,6 +113,10 @@ impl FloatEncoding {
|
|||
Some(v) => Value::Scalar(Scalar::F64(v)),
|
||||
None => Value::Null,
|
||||
},
|
||||
Self::RLE64(c) => match c.value(row_id) {
|
||||
Some(v) => Value::Scalar(Scalar::F64(v)),
|
||||
None => Value::Null,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -105,6 +127,7 @@ impl FloatEncoding {
|
|||
match &self {
|
||||
Self::Fixed64(c) => Values::F64(c.values::<f64>(row_ids, vec![])),
|
||||
Self::FixedNull64(c) => Values::F64N(c.values(row_ids, vec![])),
|
||||
Self::RLE64(c) => Values::F64N(c.values(row_ids, vec![])),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,6 +138,7 @@ impl FloatEncoding {
|
|||
match &self {
|
||||
Self::Fixed64(c) => Values::F64(c.all_values::<f64>(vec![])),
|
||||
Self::FixedNull64(c) => Values::F64N(c.all_values(vec![])),
|
||||
Self::RLE64(c) => Values::F64N(c.all_values(vec![])),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -127,6 +151,7 @@ impl FloatEncoding {
|
|||
match &self {
|
||||
Self::Fixed64(c) => c.row_ids_filter(value.as_f64(), op, dst),
|
||||
Self::FixedNull64(c) => c.row_ids_filter(value.as_f64(), op, dst),
|
||||
Self::RLE64(c) => c.row_ids_filter(value.as_f64(), op, dst),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -146,6 +171,9 @@ impl FloatEncoding {
|
|||
c.row_ids_filter_range((low.1.as_f64(), &low.0), (high.1.as_f64(), &high.0), dst)
|
||||
}
|
||||
Self::FixedNull64(_) => todo!(),
|
||||
Self::RLE64(c) => {
|
||||
c.row_ids_filter_range((low.1.as_f64(), &low.0), (high.1.as_f64(), &high.0), dst)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -156,6 +184,10 @@ impl FloatEncoding {
|
|||
Some(v) => Value::Scalar(Scalar::F64(v)),
|
||||
None => Value::Null,
|
||||
},
|
||||
Self::RLE64(c) => match c.min(row_ids) {
|
||||
Some(v) => Value::Scalar(Scalar::F64(v)),
|
||||
None => Value::Null,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -166,6 +198,10 @@ impl FloatEncoding {
|
|||
Some(v) => Value::Scalar(Scalar::F64(v)),
|
||||
None => Value::Null,
|
||||
},
|
||||
Self::RLE64(c) => match c.max(row_ids) {
|
||||
Some(v) => Value::Scalar(Scalar::F64(v)),
|
||||
None => Value::Null,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -176,6 +212,10 @@ impl FloatEncoding {
|
|||
Some(v) => Scalar::F64(v),
|
||||
None => Scalar::Null,
|
||||
},
|
||||
Self::RLE64(c) => match c.sum(row_ids) {
|
||||
Some(v) => Scalar::F64(v),
|
||||
None => Scalar::Null,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -183,6 +223,7 @@ impl FloatEncoding {
|
|||
match &self {
|
||||
Self::Fixed64(c) => c.count(row_ids),
|
||||
Self::FixedNull64(c) => c.count(row_ids),
|
||||
Self::RLE64(c) => c.count(row_ids),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -191,6 +232,7 @@ impl FloatEncoding {
|
|||
match &self {
|
||||
Self::Fixed64(_) => "None",
|
||||
Self::FixedNull64(_) => "None",
|
||||
Self::RLE64(enc) => enc.name(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -199,6 +241,7 @@ impl FloatEncoding {
|
|||
match &self {
|
||||
Self::Fixed64(_) => "f64",
|
||||
Self::FixedNull64(_) => "f64",
|
||||
Self::RLE64(_) => "f64",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -209,30 +252,100 @@ impl std::fmt::Display for FloatEncoding {
|
|||
match self {
|
||||
Self::Fixed64(enc) => write!(f, "[{}]: {}", name, enc),
|
||||
Self::FixedNull64(enc) => write!(f, "[{}]: {}", name, enc),
|
||||
Self::RLE64(enc) => write!(f, "[{}]: {}", name, enc),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn rle_rows(arr: &[f64]) -> usize {
|
||||
arr.len()
|
||||
- arr
|
||||
.iter()
|
||||
.zip(arr.iter().skip(1))
|
||||
.filter(|(curr, next)| matches!(curr.partial_cmp(next), Some(Ordering::Equal)))
|
||||
.count()
|
||||
}
|
||||
|
||||
fn rle_rows_opt(mut itr: impl Iterator<Item = Option<f64>>) -> usize {
|
||||
let mut v = match itr.next() {
|
||||
Some(v) => v,
|
||||
None => return 0,
|
||||
};
|
||||
|
||||
let mut total_rows = 0;
|
||||
for next in itr {
|
||||
if let Some(Ordering::Equal) = v.partial_cmp(&next) {
|
||||
continue;
|
||||
}
|
||||
|
||||
total_rows += 1;
|
||||
v = next;
|
||||
}
|
||||
|
||||
total_rows + 1 // account for original run
|
||||
}
|
||||
|
||||
/// A lever to decide the minimum size in bytes that RLE the column needs to
|
||||
/// reduce the overall footprint by. 0.1 means that the size of the column must
|
||||
/// be reduced by 10%
|
||||
pub const MIN_RLE_SIZE_REDUCTION: f64 = 0.3; // 30%
|
||||
|
||||
/// Converts a slice of `f64` values into a `FloatEncoding`.
|
||||
///
|
||||
/// There are two possible encodings for &[f64]:
|
||||
/// * "None": effectively store the slice in a vector;
|
||||
/// * "RLE": for slices that have a sufficiently low cardinality they may
|
||||
/// benefit from being run-length encoded.
|
||||
///
|
||||
/// The encoding is chosen based on the heuristics in the `From` implementation
|
||||
impl From<&[f64]> for FloatEncoding {
|
||||
fn from(arr: &[f64]) -> Self {
|
||||
// The number of rows we would reduce the column by if we encoded it
|
||||
// as RLE.
|
||||
let base_size = arr.len() * size_of::<f64>();
|
||||
let rle_size = rle_rows(arr) * size_of::<(u32, Option<f64>)>(); // size of a run length
|
||||
if (base_size as f64 - rle_size as f64) / base_size as f64 >= MIN_RLE_SIZE_REDUCTION {
|
||||
return Self::RLE64(RLE::from_iter(arr.to_vec()));
|
||||
}
|
||||
|
||||
// Don't apply a compression encoding to the column
|
||||
Self::Fixed64(Fixed::<f64>::from(arr))
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts an Arrow `Float64Array` into a `FloatEncoding`.
|
||||
/// Converts an Arrow Float array into a `FloatEncoding`.
|
||||
///
|
||||
/// There are two possible encodings for an Arrow array:
|
||||
/// * "None": effectively keep the data in its Arrow array;
|
||||
/// * "RLE": for arrays that have a sufficiently large number of NULL values
|
||||
/// they may benefit from being run-length encoded.
|
||||
///
|
||||
/// The encoding is chosen based on the heuristics in the `From` implementation
|
||||
impl From<arrow::array::Float64Array> for FloatEncoding {
|
||||
fn from(arr: arrow::array::Float64Array) -> Self {
|
||||
if arr.null_count() == 0 {
|
||||
return Self::from(arr.values());
|
||||
}
|
||||
|
||||
// The number of rows we would reduce the column by if we encoded it
|
||||
// as RLE.
|
||||
let base_size = arr.len() * size_of::<f64>();
|
||||
let rle_size = rle_rows_opt(arr.iter()) * size_of::<(u32, Option<f64>)>(); // size of a run length
|
||||
if (base_size as f64 - rle_size as f64) / base_size as f64 >= MIN_RLE_SIZE_REDUCTION {
|
||||
return Self::RLE64(RLE::from_iter(&arr));
|
||||
}
|
||||
|
||||
Self::FixedNull64(FixedNull::<arrow::datatypes::Float64Type>::from(arr))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::iter;
|
||||
|
||||
use super::*;
|
||||
use arrow::array::Float64Array;
|
||||
use cmp::Operator;
|
||||
|
||||
#[test]
|
||||
fn size_raw() {
|
||||
|
@ -255,4 +368,114 @@ mod test {
|
|||
assert_eq!(enc.size_raw(true), 64);
|
||||
assert_eq!(enc.size_raw(false), 56);
|
||||
}
|
||||
|
||||
fn rle_rows() {
|
||||
let cases = vec![
|
||||
(vec![0.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0], 9),
|
||||
(vec![0.0, 0.0], 1),
|
||||
(vec![1.0, 2.0, 1.0], 3),
|
||||
(vec![1.0, 2.0, 1.0, 1.0], 3),
|
||||
(vec![1.0], 1),
|
||||
];
|
||||
|
||||
for (input, exp) in cases {
|
||||
assert_eq!(super::rle_rows(input.as_slice()), exp);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rle_rows_opt() {
|
||||
let cases = vec![
|
||||
(vec![Some(0.0), Some(2.0), Some(1.0)], 3),
|
||||
(vec![Some(0.0), Some(0.0)], 1),
|
||||
];
|
||||
|
||||
for (input, exp) in cases {
|
||||
assert_eq!(super::rle_rows_opt(input.into_iter()), exp);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn from_arrow_array() {
|
||||
// Rows not reduced
|
||||
let input: Vec<Option<f64>> = vec![Some(33.2), Some(1.2), Some(2.2), None, Some(3.2)];
|
||||
let arr = Float64Array::from(input);
|
||||
let enc = FloatEncoding::from(arr);
|
||||
assert!(matches!(enc, FloatEncoding::FixedNull64(_)));
|
||||
|
||||
// Rows not reduced and no nulls so can go in `Fixed64`.
|
||||
let input: Vec<Option<f64>> = vec![Some(33.2), Some(1.2), Some(2.2), Some(3.2)];
|
||||
let arr = Float64Array::from(input);
|
||||
let enc = FloatEncoding::from(arr);
|
||||
assert!(matches!(enc, FloatEncoding::Fixed64(_)));
|
||||
|
||||
// Goldilocks - encode as RLE
|
||||
let input: Vec<Option<f64>> = vec![Some(33.2); 10];
|
||||
let arr = Float64Array::from(input);
|
||||
let enc = FloatEncoding::from(arr);
|
||||
assert!(matches!(enc, FloatEncoding::RLE64(_)));
|
||||
|
||||
// Goldilocks - encode as RLE
|
||||
let mut input: Vec<Option<f64>> = vec![Some(33.2); 10];
|
||||
input.extend(iter::repeat(None).take(10));
|
||||
let arr = Float64Array::from(input);
|
||||
let enc = FloatEncoding::from(arr);
|
||||
assert!(matches!(enc, FloatEncoding::RLE64(_)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
// Test NaN behaviour when `FloatEncoder`s are used.
|
||||
//
|
||||
// TODO(edd): I need to add the correct comparators to the scalar encodings
|
||||
// so that they behave the same as PG
|
||||
//
|
||||
fn row_ids_filter_nan() {
|
||||
let data = vec![22.3, f64::NAN, f64::NEG_INFINITY, f64::INFINITY];
|
||||
|
||||
let cases = vec![
|
||||
FloatEncoding::RLE64(RLE::from_iter(data.clone())),
|
||||
FloatEncoding::Fixed64(Fixed::from(data.as_slice())),
|
||||
FloatEncoding::FixedNull64(FixedNull::from(data.as_slice())),
|
||||
];
|
||||
|
||||
for enc in cases {
|
||||
_row_ids_filter_nan(enc)
|
||||
}
|
||||
}
|
||||
|
||||
fn _row_ids_filter_nan(_enc: FloatEncoding) {
|
||||
// These cases replicate the behaviour in PG.
|
||||
let cases = vec![
|
||||
(2.0, Operator::Equal, vec![0]), // 22.3
|
||||
(2.0, Operator::NotEqual, vec![0, 1, 2, 3]), // 22.3, NaN, -∞, ∞
|
||||
(2.0, Operator::LT, vec![2]), // -∞
|
||||
(2.0, Operator::LTE, vec![2]), // -∞
|
||||
(2.0, Operator::GT, vec![0, 1, 3]), // 22.3, NaN, ∞
|
||||
(2.0, Operator::GTE, vec![0, 1, 3]), // 22.3, NaN, ∞
|
||||
(f64::NAN, Operator::Equal, vec![1]), // NaN
|
||||
(f64::NAN, Operator::NotEqual, vec![0, 2, 3]), // 22.3, -∞, ∞
|
||||
(f64::NAN, Operator::LT, vec![0, 2, 3]), // 22.3, -∞, ∞
|
||||
(f64::NAN, Operator::LTE, vec![0, 1, 2, 3]), // 22.3, NaN, -∞, ∞
|
||||
(f64::NAN, Operator::GT, vec![]), //
|
||||
(f64::NAN, Operator::GTE, vec![1]), // NaN
|
||||
(f64::INFINITY, Operator::Equal, vec![3]), // ∞
|
||||
(f64::INFINITY, Operator::NotEqual, vec![0, 1, 2]), // 22.3, NaN, -∞
|
||||
(f64::INFINITY, Operator::LT, vec![0, 2]), // 22.3, -∞
|
||||
(f64::INFINITY, Operator::LTE, vec![0, 2, 3]), // 22.3, -∞, ∞
|
||||
(f64::INFINITY, Operator::GT, vec![1]), // NaN
|
||||
(f64::INFINITY, Operator::GTE, vec![1, 3]), // NaN, ∞
|
||||
(f64::NEG_INFINITY, Operator::Equal, vec![2]), // -∞
|
||||
(f64::NEG_INFINITY, Operator::NotEqual, vec![0, 1, 3]), // 22.3, NaN, ∞
|
||||
(f64::NEG_INFINITY, Operator::LT, vec![]), //
|
||||
(f64::NEG_INFINITY, Operator::LTE, vec![2]), // -∞
|
||||
(f64::NEG_INFINITY, Operator::GT, vec![0, 1, 3]), // 22.3, NaN, ∞
|
||||
(f64::NEG_INFINITY, Operator::GTE, vec![0, 1, 2, 3]), // 22.3, NaN, -∞, ∞
|
||||
];
|
||||
|
||||
// TODO(edd): I need to add support for PG-like comparators for NaN etc.
|
||||
for (_v, _op, _exp) in cases {
|
||||
//let dst = enc.row_ids_filter(v, &op, RowIDs::new_vector());
|
||||
//assert_eq!(dst.unwrap_vector(), &exp, "example '{} {:?}' failed", op, v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue