diff --git a/read_buffer/src/column/encoding/scalar/rle.rs b/read_buffer/src/column/encoding/scalar/rle.rs index 8332e267ec..66a4304740 100644 --- a/read_buffer/src/column/encoding/scalar/rle.rs +++ b/read_buffer/src/column/encoding/scalar/rle.rs @@ -3,6 +3,7 @@ use crate::column::RowIDs; use std::{ cmp::Ordering, fmt::{Debug, Display}, + iter, }; pub const ENCODING_NAME: &str = "RLE"; @@ -270,34 +271,131 @@ impl RLE { /// Returns the logical value present at the provided row id. /// - /// N.B right now this doesn't discern between an invalid row id and a NULL - /// value at a valid location. - pub fn value(&self, _row_id: u32) -> Option { - todo!() + /// TODO(edd): a sparse index on this can help with materialisation cost by + /// providing starting indexes into the in the run length collection. + pub fn value(&self, row_id: u32) -> Option { + assert!( + row_id < self.num_rows(), + "row_id {:?} out of bounds for {:?} rows", + row_id, + self.num_rows() + ); + + let mut ordinal_offset = 0; + for (rl, v) in &self.run_lengths { + if ordinal_offset + rl > row_id { + // this run-length overlaps desired row id + return *v; + } + ordinal_offset += rl; + } + + // we are guaranteed to find a value at the provided row_id because + // `row_id < num_rows` + unreachable!( + "could not find value at row ID {:?}. num_rows = {:?}", + row_id, + self.num_rows() + ) + } + + fn check_row_ids_ordered(&self, row_ids: &[u32]) -> bool { + if row_ids.is_empty() { + return true; + } + + let mut last = row_ids[0]; + for &row_id in row_ids.iter().skip(1) { + if row_id <= last { + return false; + } + last = row_id; + } + true } /// Materialises a vector of references to the decoded values in the - /// provided row ids. + /// provided ordered set of row ids. /// - /// NULL values are represented by None. It is the caller's responsibility - /// to ensure row ids are a monotonically increasing set. - pub fn values<'a>( - &'a self, - _row_ids: &[u32], - mut _dst: Vec>, - ) -> Vec> { - todo!() + /// NULL values are represented by None. + /// + /// # Panics + /// + /// The behaviour of providing row IDs that are not an ordered set is + /// undefined. `values` may panic if the provided row IDs are are not an + /// ordered set in ascending order. + /// + /// Panics if the number of row IDs requested is more than the number of + /// rows in the column. + /// + /// Panics if a requested row ID is out of bounds of the ordinal offset of + /// a logical value. + /// + pub fn values(&self, row_ids: &[u32], mut dst: Vec>) -> Vec> { + assert!( + row_ids.len() < self.num_rows() as usize, + "more row_ids {:?} than rows {:?}", + row_ids.len(), + self.num_rows() + ); + + dst.clear(); + dst.reserve(row_ids.len()); + + // Ensure row ids ordered + debug_assert!(self.check_row_ids_ordered(row_ids)); + + let mut curr_logical_row_id = 0; + let (mut curr_entry_rl, mut curr_value) = self.run_lengths[0]; + + let mut i = 1; + for &row_id in row_ids { + assert!( + row_id < self.num_rows(), + "row_id {:?} beyond max row {:?}", + row_id, + self.num_rows() - 1 + ); + + while curr_logical_row_id + curr_entry_rl <= row_id { + // this encoded entry does not cover the row we need. + // move on to next entry + curr_logical_row_id += curr_entry_rl; + curr_entry_rl = self.run_lengths[i].0; + curr_value = self.run_lengths[i].1; + + i += 1; + } + + // this encoded entry covers the row_id we want. + dst.push(curr_value); + + curr_logical_row_id += 1; // move forwards a logical row + curr_entry_rl -= 1; + } + + assert_eq!(row_ids.len(), dst.len()); + dst } /// Returns references to the logical (decoded) values for all the rows in /// the column. /// /// NULL values are represented by None. - pub fn all_values<'a>(&'a self, mut _dst: Vec>) -> Vec> { - todo!() + pub fn all_values(&self, mut dst: Vec>) -> Vec> { + dst.clear(); + dst.reserve(self.num_rows as usize); + + for (rl, v) in &self.run_lengths { + dst.extend(iter::repeat(v).take(*rl as usize)); + } + dst } /// Returns true if a non-null value exists at any of the row ids. + /// + /// TODO(edd): this needs implementing when we push down NULL predicate + /// support. pub fn has_non_null_value(&self, _row_ids: &[u32]) -> bool { todo!() } @@ -423,13 +521,93 @@ mod test { fn size() {} #[test] - fn value() {} + fn value() { + let mut enc = RLE::default(); + enc.push_none(); + enc.push_additional(Some(45), 3); + enc.push_additional(Some(90), 2); + enc.push(21); + + assert_eq!(enc.value(0), None); + assert_eq!(enc.value(1), Some(45)); + assert_eq!(enc.value(3), Some(45)); + assert_eq!(enc.value(4), Some(90)); + assert_eq!(enc.value(6), Some(21)); + } #[test] - fn values() {} + fn check_row_ids_ordered() { + let cases = vec![ + (&[0, 1, 2][..], true), + (&[0], true), + (&[], true), + (&[0, 2], true), + (&[1, 2], true), + (&[0, 0, 2], false), + (&[0, 1, 0], false), + (&[2, 1, 0], false), + (&[1, 1], false), + (&[1, 2, 2], false), + ]; + + let enc: RLE = RLE::default(); + + for (input, exp) in cases { + assert_eq!(enc.check_row_ids_ordered(input), exp); + } + } #[test] - fn all_values() {} + fn values() { + let mut enc = RLE::default(); + enc.push_none(); + enc.push_additional(Some(45), 3); + enc.push_additional(Some(90), 2); + enc.push(21); + + // ensure buffer cleared by populating it + assert_eq!( + enc.values(&[0, 1, 2], vec![Some(33)]), + vec![None, Some(45), Some(45)] + ); + + assert_eq!( + enc.values(&[0, 1, 2, 3, 4], vec![]), + vec![None, Some(45), Some(45), Some(45), Some(90)] + ); + + assert_eq!(enc.values(&[2, 5], vec![]), vec![Some(45), Some(90)]); + } + + #[test] + fn all_values() { + let mut enc = RLE::default(); + // ensure buffer cleared by populating it + assert!(enc.all_values(vec![Some(33)]).is_empty()); + + enc.push_additional(Some(45), 3); + enc.push_additional(Some(90), 2); + enc.push_additional(None, 2); + enc.push(21); + + assert_eq!( + enc.all_values(vec![None, Some(99)]), + vec![ + Some(45), + Some(45), + Some(45), + Some(90), + Some(90), + None, + None, + Some(21) + ] + ); + + let mut enc: RLE = RLE::default(); + enc.push_none(); + assert_eq!(enc.all_values(vec![]), vec![None]); + } #[test] fn row_ids_filter_eq() {}