Merge pull request #520 from influxdata/er/refactor/read-filter-result
refactor: encapsulate results from segment/table into nicer typespull/24376/head
commit
54ae680780
|
@ -3,7 +3,7 @@ use std::{borrow::Cow, collections::BTreeMap};
|
|||
use arrow_deps::arrow::datatypes::SchemaRef;
|
||||
|
||||
use crate::column::{
|
||||
cmp::Operator, Column, OwnedValue, RowIDs, RowIDsOption, Scalar, Value, Values,
|
||||
cmp::Operator, Column, OwnedValue, RowIDs, RowIDsOption, Scalar, Value, Values, ValuesIterator,
|
||||
};
|
||||
|
||||
/// The name used for a timestamp column.
|
||||
|
@ -169,9 +169,9 @@ impl Segment {
|
|||
&self,
|
||||
columns: &[ColumnName<'a>],
|
||||
predicates: &[Predicate<'_>],
|
||||
) -> Vec<(ColumnName<'a>, Values)> {
|
||||
) -> ReadFilterResult<'a> {
|
||||
let row_ids = self.row_ids_from_predicates(predicates);
|
||||
self.materialise_rows(columns, row_ids)
|
||||
ReadFilterResult(self.materialise_rows(columns, row_ids))
|
||||
}
|
||||
|
||||
fn materialise_rows<'a>(
|
||||
|
@ -434,50 +434,71 @@ impl MetaData {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::column::ValuesIterator;
|
||||
/// Encapsulates results from segments with a structure that makes them easier
|
||||
/// to work with and display.
|
||||
pub struct ReadFilterResult<'a>(pub Vec<(ColumnName<'a>, Values)>);
|
||||
|
||||
fn stringify_read_filter_results(results: Vec<(ColumnName<'_>, Values)>) -> String {
|
||||
let mut out = String::new();
|
||||
impl<'a> ReadFilterResult<'a> {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::fmt::Debug for &ReadFilterResult<'a> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
// header line.
|
||||
for (i, (k, _)) in results.iter().enumerate() {
|
||||
out.push_str(k);
|
||||
if i < results.len() - 1 {
|
||||
out.push(',');
|
||||
for (i, (k, _)) in self.0.iter().enumerate() {
|
||||
write!(f, "{}", k)?;
|
||||
|
||||
if i < self.0.len() - 1 {
|
||||
write!(f, ",")?;
|
||||
}
|
||||
}
|
||||
out.push('\n');
|
||||
writeln!(f)?;
|
||||
|
||||
// TODO: handle empty results?
|
||||
let expected_rows = results[0].1.len();
|
||||
// Display the rest of the values.
|
||||
std::fmt::Display::fmt(&self, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::fmt::Display for &ReadFilterResult<'a> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
if self.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let expected_rows = self.0[0].1.len();
|
||||
let mut rows = 0;
|
||||
|
||||
let mut iter_map = results
|
||||
let mut iter_map = self
|
||||
.0
|
||||
.iter()
|
||||
.map(|(k, v)| (*k, ValuesIterator::new(v)))
|
||||
.collect::<BTreeMap<&str, ValuesIterator<'_>>>();
|
||||
|
||||
while rows < expected_rows {
|
||||
if rows > 0 {
|
||||
out.push('\n');
|
||||
writeln!(f)?;
|
||||
}
|
||||
|
||||
for (i, (k, _)) in results.iter().enumerate() {
|
||||
for (i, (k, _)) in self.0.iter().enumerate() {
|
||||
if let Some(itr) = iter_map.get_mut(k) {
|
||||
out.push_str(&format!("{}", itr.next().unwrap()));
|
||||
if i < results.len() - 1 {
|
||||
out.push(',');
|
||||
write!(f, "{}", itr.next().unwrap())?;
|
||||
if i < self.0.len() - 1 {
|
||||
write!(f, ",")?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rows += 1;
|
||||
}
|
||||
|
||||
out
|
||||
writeln!(f)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
fn build_predicates_with_time(
|
||||
from: i64,
|
||||
|
@ -586,72 +607,81 @@ mod test {
|
|||
|
||||
let segment = Segment::new(6, columns);
|
||||
|
||||
let results = segment.read_filter(
|
||||
&["count", "region", "time"],
|
||||
&build_predicates_with_time(1, 6, vec![]),
|
||||
);
|
||||
let expected = "count,region,time
|
||||
let cases = vec![
|
||||
(
|
||||
vec!["count", "region", "time"],
|
||||
build_predicates_with_time(1, 6, vec![]),
|
||||
"count,region,time
|
||||
100,west,1
|
||||
101,west,2
|
||||
200,east,3
|
||||
203,west,4
|
||||
203,south,5";
|
||||
assert_eq!(stringify_read_filter_results(results), expected);
|
||||
203,south,5
|
||||
",
|
||||
),
|
||||
(
|
||||
vec!["time", "region", "method"],
|
||||
build_predicates_with_time(-19, 2, vec![]),
|
||||
"time,region,method
|
||||
1,west,GET
|
||||
",
|
||||
),
|
||||
(
|
||||
vec!["time"],
|
||||
build_predicates_with_time(0, 3, vec![]),
|
||||
"time
|
||||
1
|
||||
2
|
||||
",
|
||||
),
|
||||
(
|
||||
vec!["method"],
|
||||
build_predicates_with_time(0, 3, vec![]),
|
||||
"method
|
||||
GET
|
||||
POST
|
||||
",
|
||||
),
|
||||
(
|
||||
vec!["count", "method", "time"],
|
||||
build_predicates_with_time(
|
||||
0,
|
||||
6,
|
||||
vec![("method", (Operator::Equal, Value::String("POST")))],
|
||||
),
|
||||
"count,method,time
|
||||
101,POST,2
|
||||
200,POST,3
|
||||
203,POST,4
|
||||
",
|
||||
),
|
||||
(
|
||||
vec!["region", "time"],
|
||||
build_predicates_with_time(
|
||||
0,
|
||||
6,
|
||||
vec![("method", (Operator::Equal, Value::String("POST")))],
|
||||
),
|
||||
"region,time
|
||||
west,2
|
||||
east,3
|
||||
west,4
|
||||
",
|
||||
),
|
||||
];
|
||||
|
||||
let results = segment.read_filter(
|
||||
&["time", "region", "method"],
|
||||
&build_predicates_with_time(-19, 2, vec![]),
|
||||
);
|
||||
let expected = "time,region,method
|
||||
1,west,GET";
|
||||
assert_eq!(stringify_read_filter_results(results), expected);
|
||||
for (cols, predicates, expected) in cases {
|
||||
let results = segment.read_filter(&cols, &predicates);
|
||||
assert_eq!(format!("{:?}", &results), expected);
|
||||
}
|
||||
|
||||
// test no matching rows
|
||||
let results = segment.read_filter(
|
||||
&["method", "region", "time"],
|
||||
&build_predicates_with_time(-19, 1, vec![]),
|
||||
);
|
||||
let expected = "";
|
||||
assert!(results.is_empty());
|
||||
|
||||
let results = segment.read_filter(&["time"], &build_predicates_with_time(0, 3, vec![]));
|
||||
let expected = "time
|
||||
1
|
||||
2";
|
||||
assert_eq!(stringify_read_filter_results(results), expected);
|
||||
|
||||
let results = segment.read_filter(&["method"], &build_predicates_with_time(0, 3, vec![]));
|
||||
let expected = "method
|
||||
GET
|
||||
POST";
|
||||
assert_eq!(stringify_read_filter_results(results), expected);
|
||||
|
||||
let results = segment.read_filter(
|
||||
&["count", "method", "time"],
|
||||
&build_predicates_with_time(
|
||||
0,
|
||||
6,
|
||||
vec![("method", (Operator::Equal, Value::String("POST")))],
|
||||
),
|
||||
);
|
||||
let expected = "count,method,time
|
||||
101,POST,2
|
||||
200,POST,3
|
||||
203,POST,4";
|
||||
assert_eq!(stringify_read_filter_results(results), expected);
|
||||
|
||||
let results = segment.read_filter(
|
||||
&["region", "time"],
|
||||
&build_predicates_with_time(
|
||||
0,
|
||||
6,
|
||||
vec![("method", (Operator::Equal, Value::String("POST")))],
|
||||
),
|
||||
);
|
||||
let expected = "region,time
|
||||
west,2
|
||||
east,3
|
||||
west,4";
|
||||
assert_eq!(stringify_read_filter_results(results), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -1,10 +1,14 @@
|
|||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::fmt::Display;
|
||||
use std::slice::Iter;
|
||||
|
||||
use arrow_deps::arrow::record_batch::RecordBatch;
|
||||
|
||||
use crate::column::{AggregateResult, AggregateType, OwnedValue, Scalar, Value, Values};
|
||||
use crate::segment::{ColumnName, GroupKey, Predicate, Segment};
|
||||
use crate::{
|
||||
column::{AggregateResult, AggregateType, OwnedValue, Scalar, Value},
|
||||
segment::ReadFilterResult,
|
||||
};
|
||||
|
||||
/// A Table represents data for a single measurement.
|
||||
///
|
||||
|
@ -122,23 +126,25 @@ impl Table {
|
|||
&self,
|
||||
columns: &[ColumnName<'a>],
|
||||
predicates: &[Predicate<'_>],
|
||||
) -> Vec<(ColumnName<'a>, Vec<Values>)> {
|
||||
) -> ReadFilterResults<'a> {
|
||||
// identify segments where time range and predicates match could match
|
||||
// using segment meta data, and then execute against those segments and
|
||||
// merge results.
|
||||
let segments = self.filter_segments(predicates);
|
||||
|
||||
let mut results = columns.iter().map(|&col_name| (col_name, vec![])).collect();
|
||||
let mut results = ReadFilterResults {
|
||||
names: columns.to_vec(),
|
||||
values: vec![],
|
||||
};
|
||||
|
||||
if segments.is_empty() {
|
||||
return results;
|
||||
}
|
||||
|
||||
for segment in segments {
|
||||
let segment_result = segment.read_filter(columns, predicates);
|
||||
for (i, (col_name, values)) in segment_result.into_iter().enumerate() {
|
||||
assert_eq!(results[i].0, col_name);
|
||||
results[i].1.push(values);
|
||||
}
|
||||
results
|
||||
.values
|
||||
.push(segment.read_filter(columns, predicates));
|
||||
}
|
||||
|
||||
results
|
||||
|
@ -465,60 +471,49 @@ impl MetaData {
|
|||
}
|
||||
}
|
||||
|
||||
/// Encapsulates results from tables with a structure that makes them easier
|
||||
/// to work with and display.
|
||||
pub struct ReadFilterResults<'a> {
|
||||
pub names: Vec<ColumnName<'a>>,
|
||||
pub values: Vec<ReadFilterResult<'a>>,
|
||||
}
|
||||
|
||||
impl<'a> ReadFilterResults<'a> {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.values.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Display for ReadFilterResults<'a> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
// header line.
|
||||
for (i, k) in self.names.iter().enumerate() {
|
||||
write!(f, "{}", k)?;
|
||||
|
||||
if i < self.names.len() - 1 {
|
||||
write!(f, ",")?;
|
||||
}
|
||||
}
|
||||
writeln!(f)?;
|
||||
|
||||
if self.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Display all the results of each segment
|
||||
for segment_values in &self.values {
|
||||
segment_values.fmt(f)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::column::{cmp::Operator, Column, ValuesIterator};
|
||||
use crate::column::{cmp::Operator, Column};
|
||||
use crate::segment::{ColumnType, TIME_COLUMN_NAME};
|
||||
|
||||
fn stringify_select_results(table: Vec<(ColumnName<'_>, Vec<Values>)>) -> String {
|
||||
let mut out = String::new();
|
||||
// header line.
|
||||
for (i, (k, _)) in table.iter().enumerate() {
|
||||
out.push_str(k);
|
||||
if i < table.len() - 1 {
|
||||
out.push(',');
|
||||
}
|
||||
}
|
||||
out.push('\n');
|
||||
|
||||
// TODO: handle empty results?
|
||||
// let expected_rows = results[0].1.iter().map(|v| v.len() as i32).sum();
|
||||
|
||||
let total_segments = table[0].1.len();
|
||||
let mut segment = 0;
|
||||
|
||||
while segment < total_segments {
|
||||
// build an iterator for each column in this segment.
|
||||
let mut col_itrs = table
|
||||
.iter()
|
||||
.map(|(k, v)| (*k, ValuesIterator::new(&v[segment])))
|
||||
.collect::<BTreeMap<&str, _>>();
|
||||
|
||||
// cycle through each column draining one value per iterator until
|
||||
// all the column iterators are drained.
|
||||
let mut drained_column = 0;
|
||||
while drained_column < table.len() {
|
||||
for (i, (column_name, _)) in table.iter().enumerate() {
|
||||
let itr = col_itrs.get_mut(column_name).unwrap();
|
||||
if let Some(v) = itr.next() {
|
||||
out.push_str(&format!("{}", v));
|
||||
if i < table.len() - 1 {
|
||||
out.push(',');
|
||||
}
|
||||
} else {
|
||||
drained_column += 1;
|
||||
}
|
||||
}
|
||||
out.push('\n');
|
||||
}
|
||||
|
||||
segment += 1;
|
||||
}
|
||||
|
||||
out
|
||||
}
|
||||
|
||||
fn build_predicates(
|
||||
from: i64,
|
||||
to: i64,
|
||||
|
@ -575,7 +570,7 @@ mod test {
|
|||
&build_predicates(1, 31, vec![]),
|
||||
);
|
||||
assert_eq!(
|
||||
stringify_select_results(results),
|
||||
format!("{}", &results),
|
||||
"time,count,region
|
||||
1,100,west
|
||||
2,101,west
|
||||
|
@ -583,11 +578,9 @@ mod test {
|
|||
4,203,west
|
||||
5,203,south
|
||||
6,10,north
|
||||
|
||||
10,1000,south
|
||||
20,1002,north
|
||||
30,1200,east
|
||||
|
||||
",
|
||||
);
|
||||
|
||||
|
@ -602,17 +595,15 @@ mod test {
|
|||
);
|
||||
|
||||
assert_eq!(
|
||||
format!("{}", &results),
|
||||
"time,region
|
||||
1,west
|
||||
2,west
|
||||
3,east
|
||||
4,west
|
||||
6,north
|
||||
|
||||
20,north
|
||||
|
||||
",
|
||||
stringify_select_results(results)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue