From 71598d9b3e6c7614976ffa3237a51ee5fe8d078c Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 1 Jun 2021 11:57:31 +0100 Subject: [PATCH] refactor: move rle heuristics to rle module --- read_buffer/src/column/encoding/scalar/rle.rs | 62 ++++++++++++++++++ read_buffer/src/column/float.rs | 65 ++----------------- 2 files changed, 66 insertions(+), 61 deletions(-) diff --git a/read_buffer/src/column/encoding/scalar/rle.rs b/read_buffer/src/column/encoding/scalar/rle.rs index 37d4832782..31007ecf2b 100644 --- a/read_buffer/src/column/encoding/scalar/rle.rs +++ b/read_buffer/src/column/encoding/scalar/rle.rs @@ -573,6 +573,40 @@ where } } +// This function returns an estimated size in bytes for an input slice of `T` +// were it to be run-length encoded. +pub fn estimated_size_from(arr: &[T]) -> usize { + let run_lengths = arr.len() + - arr + .iter() + .zip(arr.iter().skip(1)) + .filter(|(curr, next)| matches!(curr.partial_cmp(next), Some(Ordering::Equal))) + .count(); + run_lengths * size_of::<(u32, Option)>() + size_of::)>>() +} + +// This function returns an estimated size in bytes for an input iterator +// yielding `Option`, were it to be run-length encoded. +pub fn estimated_size_from_iter(mut itr: impl Iterator>) -> usize { + let mut v = match itr.next() { + Some(v) => v, + None => return 0, + }; + + let mut total_rows = 0; + for next in itr { + if let Some(Ordering::Equal) = v.partial_cmp(&next) { + continue; + } + + total_rows += 1; + v = next; + } + + // +1 to account for original run + (total_rows + 1) * size_of::<(u32, Option)>() + size_of::)>>() +} + #[cfg(test)] mod test { use cmp::Operator; @@ -974,4 +1008,32 @@ mod test { } assert_eq!(transcoder.encodings(), calls * 2); } + + #[test] + fn estimated_size_from() { + let cases = vec![ + (vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 0.0, 0.0, 0.0, 0.0], 192), + (vec![0.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0], 240), + (vec![0.0, 0.0], 48), + (vec![1.0, 2.0, 1.0], 96), + (vec![1.0, 2.0, 1.0, 1.0], 96), + (vec![1.0], 48), + ]; + + for (input, exp) in cases { + assert_eq!(super::estimated_size_from(input.as_slice()), exp); + } + } + + #[test] + fn estimated_size_from_iter() { + let cases = vec![ + (vec![Some(0.0), Some(2.0), Some(1.0)], 96), + (vec![Some(0.0), Some(0.0)], 48), + ]; + + for (input, exp) in cases { + assert_eq!(super::estimated_size_from_iter(input.into_iter()), exp); + } + } } diff --git a/read_buffer/src/column/float.rs b/read_buffer/src/column/float.rs index 73b35b01b0..a6193c28a4 100644 --- a/read_buffer/src/column/float.rs +++ b/read_buffer/src/column/float.rs @@ -1,7 +1,8 @@ use arrow::array::Array; use arrow::datatypes::Float64Type; -use std::{cmp::Ordering, mem::size_of}; +use std::mem::size_of; +use super::encoding::scalar::rle; use super::encoding::scalar::transcoders::NoOpTranscoder; use super::encoding::scalar::ScalarEncoding; use super::encoding::{ @@ -202,36 +203,6 @@ impl std::fmt::Display for FloatEncoding { } } -// helper to determine how many rows the slice would have if it were RLE -// encoded. -fn rle_rows(arr: &[f64]) -> usize { - arr.len() - - arr - .iter() - .zip(arr.iter().skip(1)) - .filter(|(curr, next)| matches!(curr.partial_cmp(next), Some(Ordering::Equal))) - .count() -} - -fn rle_rows_opt(mut itr: impl Iterator>) -> usize { - let mut v = match itr.next() { - Some(v) => v, - None => return 0, - }; - - let mut total_rows = 0; - for next in itr { - if let Some(Ordering::Equal) = v.partial_cmp(&next) { - continue; - } - - total_rows += 1; - v = next; - } - - total_rows + 1 // account for original run -} - /// A lever to decide the minimum size in bytes that RLE the column needs to /// reduce the overall footprint by. 0.1 means that the size of the column must /// be reduced by 10% @@ -250,7 +221,7 @@ impl From<&[f64]> for FloatEncoding { // The number of rows we would reduce the column by if we encoded it // as RLE. let base_size = arr.len() * size_of::(); - let rle_size = rle_rows(arr) * size_of::<(u32, Option)>(); // size of a run length + let rle_size = rle::estimated_size_from(arr); // size of a run length if (base_size as f64 - rle_size as f64) / base_size as f64 >= MIN_RLE_SIZE_REDUCTION { let enc = Box::new(RLE::new_from_iter( arr.iter().cloned(), @@ -282,7 +253,7 @@ impl From for FloatEncoding { // The number of rows we would reduce the column by if we encoded it // as RLE. let base_size = arr.len() * size_of::(); - let rle_size = rle_rows_opt(arr.iter()) * size_of::<(u32, Option)>(); // size of a run length + let rle_size = rle::estimated_size_from_iter(arr.iter()); // size of a run length if (base_size as f64 - rle_size as f64) / base_size as f64 >= MIN_RLE_SIZE_REDUCTION { let enc = Box::new(RLE::new_from_iter_opt( arr.iter(), @@ -310,34 +281,6 @@ mod test { use crate::column::encoding::scalar::{fixed, fixed_null, rle}; use cmp::Operator; - #[test] - fn rle_rows() { - let cases = vec![ - (vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 0.0, 0.0, 0.0, 0.0], 7), - (vec![0.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0], 9), - (vec![0.0, 0.0], 1), - (vec![1.0, 2.0, 1.0], 3), - (vec![1.0, 2.0, 1.0, 1.0], 3), - (vec![1.0], 1), - ]; - - for (input, exp) in cases { - assert_eq!(super::rle_rows(input.as_slice()), exp); - } - } - - #[test] - fn rle_rows_opt() { - let cases = vec![ - (vec![Some(0.0), Some(2.0), Some(1.0)], 3), - (vec![Some(0.0), Some(0.0)], 1), - ]; - - for (input, exp) in cases { - assert_eq!(super::rle_rows_opt(input.into_iter()), exp); - } - } - #[test] fn from_arrow_array() { // Rows not reduced