diff --git a/segment_store/src/segment.rs b/segment_store/src/segment.rs index 0364662fc0..cd52944b87 100644 --- a/segment_store/src/segment.rs +++ b/segment_store/src/segment.rs @@ -3,7 +3,7 @@ use std::{borrow::Cow, collections::BTreeMap}; use arrow_deps::arrow::datatypes::SchemaRef; use crate::column::{ - cmp::Operator, Column, OwnedValue, RowIDs, RowIDsOption, Scalar, Value, Values, + cmp::Operator, Column, OwnedValue, RowIDs, RowIDsOption, Scalar, Value, Values, ValuesIterator, }; /// The name used for a timestamp column. @@ -169,9 +169,9 @@ impl Segment { &self, columns: &[ColumnName<'a>], predicates: &[Predicate<'_>], - ) -> Vec<(ColumnName<'a>, Values)> { + ) -> ReadFilterResult<'a> { let row_ids = self.row_ids_from_predicates(predicates); - self.materialise_rows(columns, row_ids) + ReadFilterResult(self.materialise_rows(columns, row_ids)) } fn materialise_rows<'a>( @@ -434,50 +434,71 @@ impl MetaData { } } -#[cfg(test)] -mod test { - use super::*; - use crate::column::ValuesIterator; +/// Encapsulates results from segments with a structure that makes them easier +/// to work with and display. +pub struct ReadFilterResult<'a>(pub Vec<(ColumnName<'a>, Values)>); - fn stringify_read_filter_results(results: Vec<(ColumnName<'_>, Values)>) -> String { - let mut out = String::new(); +impl<'a> ReadFilterResult<'a> { + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl<'a> std::fmt::Debug for &ReadFilterResult<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // header line. - for (i, (k, _)) in results.iter().enumerate() { - out.push_str(k); - if i < results.len() - 1 { - out.push(','); + for (i, (k, _)) in self.0.iter().enumerate() { + write!(f, "{}", k)?; + + if i < self.0.len() - 1 { + write!(f, ",")?; } } - out.push('\n'); + writeln!(f)?; - // TODO: handle empty results? - let expected_rows = results[0].1.len(); + // Display the rest of the values. + std::fmt::Display::fmt(&self, f) + } +} + +impl<'a> std::fmt::Display for &ReadFilterResult<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.is_empty() { + return Ok(()); + } + + let expected_rows = self.0[0].1.len(); let mut rows = 0; - let mut iter_map = results + let mut iter_map = self + .0 .iter() .map(|(k, v)| (*k, ValuesIterator::new(v))) .collect::>>(); while rows < expected_rows { if rows > 0 { - out.push('\n'); + writeln!(f)?; } - for (i, (k, _)) in results.iter().enumerate() { + for (i, (k, _)) in self.0.iter().enumerate() { if let Some(itr) = iter_map.get_mut(k) { - out.push_str(&format!("{}", itr.next().unwrap())); - if i < results.len() - 1 { - out.push(','); + write!(f, "{}", itr.next().unwrap())?; + if i < self.0.len() - 1 { + write!(f, ",")?; } } } rows += 1; } - - out + writeln!(f) } +} + +#[cfg(test)] +mod test { + use super::*; fn build_predicates_with_time( from: i64, @@ -586,72 +607,81 @@ mod test { let segment = Segment::new(6, columns); - let results = segment.read_filter( - &["count", "region", "time"], - &build_predicates_with_time(1, 6, vec![]), - ); - let expected = "count,region,time + let cases = vec![ + ( + vec!["count", "region", "time"], + build_predicates_with_time(1, 6, vec![]), + "count,region,time 100,west,1 101,west,2 200,east,3 203,west,4 -203,south,5"; - assert_eq!(stringify_read_filter_results(results), expected); +203,south,5 +", + ), + ( + vec!["time", "region", "method"], + build_predicates_with_time(-19, 2, vec![]), + "time,region,method +1,west,GET +", + ), + ( + vec!["time"], + build_predicates_with_time(0, 3, vec![]), + "time +1 +2 +", + ), + ( + vec!["method"], + build_predicates_with_time(0, 3, vec![]), + "method +GET +POST +", + ), + ( + vec!["count", "method", "time"], + build_predicates_with_time( + 0, + 6, + vec![("method", (Operator::Equal, Value::String("POST")))], + ), + "count,method,time +101,POST,2 +200,POST,3 +203,POST,4 +", + ), + ( + vec!["region", "time"], + build_predicates_with_time( + 0, + 6, + vec![("method", (Operator::Equal, Value::String("POST")))], + ), + "region,time +west,2 +east,3 +west,4 +", + ), + ]; - let results = segment.read_filter( - &["time", "region", "method"], - &build_predicates_with_time(-19, 2, vec![]), - ); - let expected = "time,region,method -1,west,GET"; - assert_eq!(stringify_read_filter_results(results), expected); + for (cols, predicates, expected) in cases { + let results = segment.read_filter(&cols, &predicates); + assert_eq!(format!("{:?}", &results), expected); + } + // test no matching rows let results = segment.read_filter( &["method", "region", "time"], &build_predicates_with_time(-19, 1, vec![]), ); let expected = ""; assert!(results.is_empty()); - - let results = segment.read_filter(&["time"], &build_predicates_with_time(0, 3, vec![])); - let expected = "time -1 -2"; - assert_eq!(stringify_read_filter_results(results), expected); - - let results = segment.read_filter(&["method"], &build_predicates_with_time(0, 3, vec![])); - let expected = "method -GET -POST"; - assert_eq!(stringify_read_filter_results(results), expected); - - let results = segment.read_filter( - &["count", "method", "time"], - &build_predicates_with_time( - 0, - 6, - vec![("method", (Operator::Equal, Value::String("POST")))], - ), - ); - let expected = "count,method,time -101,POST,2 -200,POST,3 -203,POST,4"; - assert_eq!(stringify_read_filter_results(results), expected); - - let results = segment.read_filter( - &["region", "time"], - &build_predicates_with_time( - 0, - 6, - vec![("method", (Operator::Equal, Value::String("POST")))], - ), - ); - let expected = "region,time -west,2 -east,3 -west,4"; - assert_eq!(stringify_read_filter_results(results), expected); } #[test] diff --git a/segment_store/src/table.rs b/segment_store/src/table.rs index 7b28144f9f..acd81875a3 100644 --- a/segment_store/src/table.rs +++ b/segment_store/src/table.rs @@ -1,10 +1,14 @@ use std::collections::{BTreeMap, BTreeSet}; +use std::fmt::Display; use std::slice::Iter; use arrow_deps::arrow::record_batch::RecordBatch; -use crate::column::{AggregateResult, AggregateType, OwnedValue, Scalar, Value, Values}; use crate::segment::{ColumnName, GroupKey, Predicate, Segment}; +use crate::{ + column::{AggregateResult, AggregateType, OwnedValue, Scalar, Value}, + segment::ReadFilterResult, +}; /// A Table represents data for a single measurement. /// @@ -122,23 +126,25 @@ impl Table { &self, columns: &[ColumnName<'a>], predicates: &[Predicate<'_>], - ) -> Vec<(ColumnName<'a>, Vec)> { + ) -> ReadFilterResults<'a> { // identify segments where time range and predicates match could match // using segment meta data, and then execute against those segments and // merge results. let segments = self.filter_segments(predicates); - let mut results = columns.iter().map(|&col_name| (col_name, vec![])).collect(); + let mut results = ReadFilterResults { + names: columns.to_vec(), + values: vec![], + }; + if segments.is_empty() { return results; } for segment in segments { - let segment_result = segment.read_filter(columns, predicates); - for (i, (col_name, values)) in segment_result.into_iter().enumerate() { - assert_eq!(results[i].0, col_name); - results[i].1.push(values); - } + results + .values + .push(segment.read_filter(columns, predicates)); } results @@ -465,60 +471,49 @@ impl MetaData { } } +/// Encapsulates results from tables with a structure that makes them easier +/// to work with and display. +pub struct ReadFilterResults<'a> { + pub names: Vec>, + pub values: Vec>, +} + +impl<'a> ReadFilterResults<'a> { + pub fn is_empty(&self) -> bool { + self.values.is_empty() + } +} + +impl<'a> Display for ReadFilterResults<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // header line. + for (i, k) in self.names.iter().enumerate() { + write!(f, "{}", k)?; + + if i < self.names.len() - 1 { + write!(f, ",")?; + } + } + writeln!(f)?; + + if self.is_empty() { + return Ok(()); + } + + // Display all the results of each segment + for segment_values in &self.values { + segment_values.fmt(f)?; + } + Ok(()) + } +} + #[cfg(test)] mod test { use super::*; - use crate::column::{cmp::Operator, Column, ValuesIterator}; + use crate::column::{cmp::Operator, Column}; use crate::segment::{ColumnType, TIME_COLUMN_NAME}; - fn stringify_select_results(table: Vec<(ColumnName<'_>, Vec)>) -> String { - let mut out = String::new(); - // header line. - for (i, (k, _)) in table.iter().enumerate() { - out.push_str(k); - if i < table.len() - 1 { - out.push(','); - } - } - out.push('\n'); - - // TODO: handle empty results? - // let expected_rows = results[0].1.iter().map(|v| v.len() as i32).sum(); - - let total_segments = table[0].1.len(); - let mut segment = 0; - - while segment < total_segments { - // build an iterator for each column in this segment. - let mut col_itrs = table - .iter() - .map(|(k, v)| (*k, ValuesIterator::new(&v[segment]))) - .collect::>(); - - // cycle through each column draining one value per iterator until - // all the column iterators are drained. - let mut drained_column = 0; - while drained_column < table.len() { - for (i, (column_name, _)) in table.iter().enumerate() { - let itr = col_itrs.get_mut(column_name).unwrap(); - if let Some(v) = itr.next() { - out.push_str(&format!("{}", v)); - if i < table.len() - 1 { - out.push(','); - } - } else { - drained_column += 1; - } - } - out.push('\n'); - } - - segment += 1; - } - - out - } - fn build_predicates( from: i64, to: i64, @@ -575,7 +570,7 @@ mod test { &build_predicates(1, 31, vec![]), ); assert_eq!( - stringify_select_results(results), + format!("{}", &results), "time,count,region 1,100,west 2,101,west @@ -583,11 +578,9 @@ mod test { 4,203,west 5,203,south 6,10,north - 10,1000,south 20,1002,north 30,1200,east - ", ); @@ -602,17 +595,15 @@ mod test { ); assert_eq!( + format!("{}", &results), "time,region 1,west 2,west 3,east 4,west 6,north - 20,north - ", - stringify_select_results(results) ); } }