feat: teach read buffer to create RLE float columns
parent
9a666fac00
commit
7525f6e9e3
|
@ -4,6 +4,7 @@ use std::{
|
||||||
cmp::Ordering,
|
cmp::Ordering,
|
||||||
fmt::{Debug, Display},
|
fmt::{Debug, Display},
|
||||||
iter,
|
iter,
|
||||||
|
mem::size_of,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub const ENCODING_NAME: &str = "RLE";
|
pub const ENCODING_NAME: &str = "RLE";
|
||||||
|
@ -72,6 +73,24 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The estimated total size in bytes of the underlying values in the
|
||||||
|
/// column if they were stored contiguously and uncompressed. `include_nulls`
|
||||||
|
/// will effectively size each NULL value as size_of<T> if set to `true`.
|
||||||
|
pub fn size_raw(&self, include_nulls: bool) -> usize {
|
||||||
|
let base_size = size_of::<Self>();
|
||||||
|
if include_nulls {
|
||||||
|
return base_size + (self.num_rows() as usize * size_of::<T>());
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove NULL values from calculation
|
||||||
|
base_size
|
||||||
|
+ self
|
||||||
|
.run_lengths
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(rl, v)| v.is_some().then(|| *rl as usize * size_of::<T>()))
|
||||||
|
.sum::<usize>()
|
||||||
|
}
|
||||||
|
|
||||||
/// The number of NULL values in this column.
|
/// The number of NULL values in this column.
|
||||||
pub fn null_count(&self) -> u32 {
|
pub fn null_count(&self) -> u32 {
|
||||||
self.null_count
|
self.null_count
|
||||||
|
@ -448,10 +467,83 @@ impl<T: PartialOrd + Debug + Copy> RLE<T> {
|
||||||
|
|
||||||
impl<T> From<&[T]> for RLE<T>
|
impl<T> From<&[T]> for RLE<T>
|
||||||
where
|
where
|
||||||
T: PartialOrd + Debug,
|
T: PartialOrd + Debug + Default + Copy,
|
||||||
{
|
{
|
||||||
fn from(_v: &[T]) -> Self {
|
fn from(arr: &[T]) -> Self {
|
||||||
todo!()
|
if arr.is_empty() {
|
||||||
|
return Self::default();
|
||||||
|
} else if arr.len() == 1 {
|
||||||
|
let mut enc = Self::default();
|
||||||
|
enc.push(arr[0]);
|
||||||
|
return enc;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut enc = Self::default();
|
||||||
|
let (mut rl, mut v) = (1, arr[0]);
|
||||||
|
for &next in arr.iter().skip(1) {
|
||||||
|
if next == v {
|
||||||
|
rl += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
enc.push_additional(Some(v), rl);
|
||||||
|
rl = 1;
|
||||||
|
v = next;
|
||||||
|
}
|
||||||
|
|
||||||
|
// push the final run length
|
||||||
|
enc.push_additional(Some(v), rl);
|
||||||
|
enc
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> From<Vec<T>> for RLE<T>
|
||||||
|
where
|
||||||
|
T: PartialOrd + Debug + Default + Copy,
|
||||||
|
{
|
||||||
|
fn from(arr: Vec<T>) -> Self {
|
||||||
|
Self::from(arr.as_slice())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> From<&[Option<T>]> for RLE<T>
|
||||||
|
where
|
||||||
|
T: PartialOrd + Debug + Default + Copy,
|
||||||
|
{
|
||||||
|
fn from(arr: &[Option<T>]) -> Self {
|
||||||
|
if arr.is_empty() {
|
||||||
|
return Self::default();
|
||||||
|
} else if arr.len() == 1 {
|
||||||
|
let mut enc = Self::default();
|
||||||
|
enc.push_additional(arr[0], 1);
|
||||||
|
return enc;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut enc = Self::default();
|
||||||
|
let (mut rl, mut v) = (1, arr[0]);
|
||||||
|
for &next in arr.iter().skip(1) {
|
||||||
|
if next == v {
|
||||||
|
rl += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
enc.push_additional(v, rl);
|
||||||
|
rl = 1;
|
||||||
|
v = next;
|
||||||
|
}
|
||||||
|
|
||||||
|
// push the final run length
|
||||||
|
enc.push_additional(v, rl);
|
||||||
|
enc
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> From<Vec<Option<T>>> for RLE<T>
|
||||||
|
where
|
||||||
|
T: PartialOrd + Debug + Default + Copy,
|
||||||
|
{
|
||||||
|
fn from(arr: Vec<Option<T>>) -> Self {
|
||||||
|
Self::from(arr.as_slice())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -459,6 +551,50 @@ where
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn from_slice() {
|
||||||
|
let cases = vec![
|
||||||
|
(&[][..], vec![]),
|
||||||
|
(&[1][..], vec![(1, Some(1))]),
|
||||||
|
(&[100, 22][..], vec![(1, Some(100)), (1, Some(22))]),
|
||||||
|
(
|
||||||
|
&[100, 22, 100, 100][..],
|
||||||
|
vec![(1, Some(100)), (1, Some(22)), (2, Some(100))],
|
||||||
|
),
|
||||||
|
];
|
||||||
|
|
||||||
|
for (input, exp_rl) in cases {
|
||||||
|
let enc: RLE<u8> = RLE::from(input);
|
||||||
|
assert_eq!(enc.run_lengths, exp_rl);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn from_slice_opt() {
|
||||||
|
let cases = vec![
|
||||||
|
(&[][..], vec![]),
|
||||||
|
(&[Some(1)][..], vec![(1, Some(1))]),
|
||||||
|
(
|
||||||
|
&[Some(100), Some(22)][..],
|
||||||
|
vec![(1, Some(100)), (1, Some(22))],
|
||||||
|
),
|
||||||
|
(
|
||||||
|
&[Some(100), Some(22), Some(100), Some(100)][..],
|
||||||
|
vec![(1, Some(100)), (1, Some(22)), (2, Some(100))],
|
||||||
|
),
|
||||||
|
(&[None][..], vec![(1, None)]),
|
||||||
|
(
|
||||||
|
&[None, None, Some(1), None][..],
|
||||||
|
vec![(2, None), (1, Some(1)), (1, None)],
|
||||||
|
),
|
||||||
|
];
|
||||||
|
|
||||||
|
for (input, exp_rl) in cases {
|
||||||
|
let enc: RLE<u8> = RLE::from(input);
|
||||||
|
assert_eq!(enc.run_lengths, exp_rl);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn push() {
|
fn push() {
|
||||||
let mut enc = RLE::default();
|
let mut enc = RLE::default();
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::cmp::Ordering;
|
||||||
use std::mem::size_of;
|
use std::mem::size_of;
|
||||||
|
|
||||||
use arrow::{self, array::Array};
|
use arrow::{self, array::Array};
|
||||||
|
@ -36,6 +37,7 @@ impl FloatEncoding {
|
||||||
size_of::<Vec<f64>>() + (enc.num_rows() as usize * size_of::<f64>())
|
size_of::<Vec<f64>>() + (enc.num_rows() as usize * size_of::<f64>())
|
||||||
}
|
}
|
||||||
Self::FixedNull64(enc) => enc.size_raw(include_nulls),
|
Self::FixedNull64(enc) => enc.size_raw(include_nulls),
|
||||||
|
Self::RLE64(enc) => enc.size_raw(include_nulls),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -250,19 +252,78 @@ impl std::fmt::Display for FloatEncoding {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn check_run_lengths_above(arr: &[f64], min_rl: usize) -> usize {
|
||||||
|
if min_rl < 1 || arr.len() < min_rl {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (mut rl, mut v) = (1, arr[0]);
|
||||||
|
let mut total_matching_rl = 0;
|
||||||
|
for next in arr.iter().skip(1) {
|
||||||
|
if let Some(Ordering::Equal) = v.partial_cmp(next) {
|
||||||
|
rl += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// run length was big enough to be considered
|
||||||
|
if rl > min_rl {
|
||||||
|
total_matching_rl += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
rl = 1;
|
||||||
|
v = *next;
|
||||||
|
}
|
||||||
|
|
||||||
|
total_matching_rl
|
||||||
|
}
|
||||||
|
|
||||||
/// Converts a slice of `f64` values into a `FloatEncoding`.
|
/// Converts a slice of `f64` values into a `FloatEncoding`.
|
||||||
|
///
|
||||||
|
/// TODO(edd): figure out what sensible heuristics look like.
|
||||||
|
///
|
||||||
|
/// There are two possible encodings for &[f64]:
|
||||||
|
/// * "None": effectively store the slice in a vector;
|
||||||
|
/// * "RLE": for slices that have a sufficiently low cardinality they may
|
||||||
|
/// benefit from being run-length encoded.
|
||||||
|
///
|
||||||
|
/// The encoding is chosen based on the heuristics in the `From` implementation
|
||||||
impl From<&[f64]> for FloatEncoding {
|
impl From<&[f64]> for FloatEncoding {
|
||||||
fn from(arr: &[f64]) -> Self {
|
fn from(arr: &[f64]) -> Self {
|
||||||
|
// The total number of run-lengths to find in order to decide to RLE
|
||||||
|
// this column is in the range `[10, 1/10th column size]`
|
||||||
|
// For example, if the columns is 1000 rows then we need to find 100
|
||||||
|
// run lengths to RLE encode it.
|
||||||
|
let total_rl_required = 10.max(arr.len() / 10);
|
||||||
|
if check_run_lengths_above(arr, 3) >= total_rl_required {
|
||||||
|
return Self::RLE64(RLE::from(arr));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't apply a compression encoding to the column
|
||||||
Self::Fixed64(Fixed::<f64>::from(arr))
|
Self::Fixed64(Fixed::<f64>::from(arr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Converts an Arrow `Float64Array` into a `FloatEncoding`.
|
/// Converts an Arrow Float array into a `FloatEncoding`.
|
||||||
|
///
|
||||||
|
/// TODO(edd): figure out what sensible heuristics look like.
|
||||||
|
///
|
||||||
|
/// There are two possible encodings for an Arrow array:
|
||||||
|
/// * "None": effectively keep the data in its Arrow array;
|
||||||
|
/// * "RLE": for arrays that have a sufficiently large number of NULL values
|
||||||
|
/// they may benefit from being run-length encoded.
|
||||||
|
///
|
||||||
|
/// The encoding is chosen based on the heuristics in the `From` implementation
|
||||||
impl From<arrow::array::Float64Array> for FloatEncoding {
|
impl From<arrow::array::Float64Array> for FloatEncoding {
|
||||||
fn from(arr: arrow::array::Float64Array) -> Self {
|
fn from(arr: arrow::array::Float64Array) -> Self {
|
||||||
if arr.null_count() == 0 {
|
if arr.null_count() == 0 {
|
||||||
return Self::from(arr.values());
|
return Self::from(arr.values());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(edd) Right now let's just RLE encode the column if it is 50% NULL.
|
||||||
|
if arr.null_count() >= arr.len() / 2 {
|
||||||
|
return Self::RLE64(RLE::from(arr.values()));
|
||||||
|
}
|
||||||
|
|
||||||
Self::FixedNull64(FixedNull::<arrow::datatypes::Float64Type>::from(arr))
|
Self::FixedNull64(FixedNull::<arrow::datatypes::Float64Type>::from(arr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue