diff --git a/read_buffer/src/column/encoding/scalar/rle.rs b/read_buffer/src/column/encoding/scalar/rle.rs index 66a4304740..bcc648b16a 100644 --- a/read_buffer/src/column/encoding/scalar/rle.rs +++ b/read_buffer/src/column/encoding/scalar/rle.rs @@ -4,6 +4,7 @@ use std::{ cmp::Ordering, fmt::{Debug, Display}, iter, + mem::size_of, }; pub const ENCODING_NAME: &str = "RLE"; @@ -72,6 +73,24 @@ impl RLE { todo!() } + /// The estimated total size in bytes of the underlying values in the + /// column if they were stored contiguously and uncompressed. `include_nulls` + /// will effectively size each NULL value as size_of if set to `true`. + pub fn size_raw(&self, include_nulls: bool) -> usize { + let base_size = size_of::(); + if include_nulls { + return base_size + (self.num_rows() as usize * size_of::()); + } + + // remove NULL values from calculation + base_size + + self + .run_lengths + .iter() + .filter_map(|(rl, v)| v.is_some().then(|| *rl as usize * size_of::())) + .sum::() + } + /// The number of NULL values in this column. pub fn null_count(&self) -> u32 { self.null_count @@ -448,10 +467,83 @@ impl RLE { impl From<&[T]> for RLE where - T: PartialOrd + Debug, + T: PartialOrd + Debug + Default + Copy, { - fn from(_v: &[T]) -> Self { - todo!() + fn from(arr: &[T]) -> Self { + if arr.is_empty() { + return Self::default(); + } else if arr.len() == 1 { + let mut enc = Self::default(); + enc.push(arr[0]); + return enc; + } + + let mut enc = Self::default(); + let (mut rl, mut v) = (1, arr[0]); + for &next in arr.iter().skip(1) { + if next == v { + rl += 1; + continue; + } + + enc.push_additional(Some(v), rl); + rl = 1; + v = next; + } + + // push the final run length + enc.push_additional(Some(v), rl); + enc + } +} + +impl From> for RLE +where + T: PartialOrd + Debug + Default + Copy, +{ + fn from(arr: Vec) -> Self { + Self::from(arr.as_slice()) + } +} + +impl From<&[Option]> for RLE +where + T: PartialOrd + Debug + Default + Copy, +{ + fn from(arr: &[Option]) -> Self { + if arr.is_empty() { + return Self::default(); + } else if arr.len() == 1 { + let mut enc = Self::default(); + enc.push_additional(arr[0], 1); + return enc; + } + + let mut enc = Self::default(); + let (mut rl, mut v) = (1, arr[0]); + for &next in arr.iter().skip(1) { + if next == v { + rl += 1; + continue; + } + + enc.push_additional(v, rl); + rl = 1; + v = next; + } + + // push the final run length + enc.push_additional(v, rl); + enc + } +} + +impl From>> for RLE +where + T: PartialOrd + Debug + Default + Copy, +{ + fn from(arr: Vec>) -> Self { + Self::from(arr.as_slice()) } } @@ -459,6 +551,50 @@ where mod test { use super::*; + #[test] + fn from_slice() { + let cases = vec![ + (&[][..], vec![]), + (&[1][..], vec![(1, Some(1))]), + (&[100, 22][..], vec![(1, Some(100)), (1, Some(22))]), + ( + &[100, 22, 100, 100][..], + vec![(1, Some(100)), (1, Some(22)), (2, Some(100))], + ), + ]; + + for (input, exp_rl) in cases { + let enc: RLE = RLE::from(input); + assert_eq!(enc.run_lengths, exp_rl); + } + } + + #[test] + fn from_slice_opt() { + let cases = vec![ + (&[][..], vec![]), + (&[Some(1)][..], vec![(1, Some(1))]), + ( + &[Some(100), Some(22)][..], + vec![(1, Some(100)), (1, Some(22))], + ), + ( + &[Some(100), Some(22), Some(100), Some(100)][..], + vec![(1, Some(100)), (1, Some(22)), (2, Some(100))], + ), + (&[None][..], vec![(1, None)]), + ( + &[None, None, Some(1), None][..], + vec![(2, None), (1, Some(1)), (1, None)], + ), + ]; + + for (input, exp_rl) in cases { + let enc: RLE = RLE::from(input); + assert_eq!(enc.run_lengths, exp_rl); + } + } + #[test] fn push() { let mut enc = RLE::default(); diff --git a/read_buffer/src/column/float.rs b/read_buffer/src/column/float.rs index f3f3111b0c..c9b05a93e9 100644 --- a/read_buffer/src/column/float.rs +++ b/read_buffer/src/column/float.rs @@ -1,3 +1,4 @@ +use std::cmp::Ordering; use std::mem::size_of; use arrow::{self, array::Array}; @@ -36,6 +37,7 @@ impl FloatEncoding { size_of::>() + (enc.num_rows() as usize * size_of::()) } Self::FixedNull64(enc) => enc.size_raw(include_nulls), + Self::RLE64(enc) => enc.size_raw(include_nulls), } } @@ -250,19 +252,78 @@ impl std::fmt::Display for FloatEncoding { } } +fn check_run_lengths_above(arr: &[f64], min_rl: usize) -> usize { + if min_rl < 1 || arr.len() < min_rl { + return 0; + } + + let (mut rl, mut v) = (1, arr[0]); + let mut total_matching_rl = 0; + for next in arr.iter().skip(1) { + if let Some(Ordering::Equal) = v.partial_cmp(next) { + rl += 1; + continue; + } + + // run length was big enough to be considered + if rl > min_rl { + total_matching_rl += 1; + } + + rl = 1; + v = *next; + } + + total_matching_rl +} + /// Converts a slice of `f64` values into a `FloatEncoding`. +/// +/// TODO(edd): figure out what sensible heuristics look like. +/// +/// There are two possible encodings for &[f64]: +/// * "None": effectively store the slice in a vector; +/// * "RLE": for slices that have a sufficiently low cardinality they may +/// benefit from being run-length encoded. +/// +/// The encoding is chosen based on the heuristics in the `From` implementation impl From<&[f64]> for FloatEncoding { fn from(arr: &[f64]) -> Self { + // The total number of run-lengths to find in order to decide to RLE + // this column is in the range `[10, 1/10th column size]` + // For example, if the columns is 1000 rows then we need to find 100 + // run lengths to RLE encode it. + let total_rl_required = 10.max(arr.len() / 10); + if check_run_lengths_above(arr, 3) >= total_rl_required { + return Self::RLE64(RLE::from(arr)); + } + + // Don't apply a compression encoding to the column Self::Fixed64(Fixed::::from(arr)) } } -/// Converts an Arrow `Float64Array` into a `FloatEncoding`. +/// Converts an Arrow Float array into a `FloatEncoding`. +/// +/// TODO(edd): figure out what sensible heuristics look like. +/// +/// There are two possible encodings for an Arrow array: +/// * "None": effectively keep the data in its Arrow array; +/// * "RLE": for arrays that have a sufficiently large number of NULL values +/// they may benefit from being run-length encoded. +/// +/// The encoding is chosen based on the heuristics in the `From` implementation impl From for FloatEncoding { fn from(arr: arrow::array::Float64Array) -> Self { if arr.null_count() == 0 { return Self::from(arr.values()); } + + // TODO(edd) Right now let's just RLE encode the column if it is 50% NULL. + if arr.null_count() >= arr.len() / 2 { + return Self::RLE64(RLE::from(arr.values())); + } + Self::FixedNull64(FixedNull::::from(arr)) } }