refactor: initial read_group skeleton

pull/24376/head
Edd Robinson 2020-12-01 22:59:10 +00:00
parent c199d59c04
commit f7f87164b4
4 changed files with 286 additions and 30 deletions

View File

@ -4,7 +4,7 @@
#![allow(unused_variables)]
pub mod column;
pub(crate) mod partition;
pub(crate) mod segment;
pub mod segment;
pub(crate) mod table;
use std::collections::BTreeMap;
@ -209,6 +209,26 @@ impl Store {
}
}
/// Generate a predicate for the time range [from, to).
pub fn time_range_predicate<'a>(from: i64, to: i64) -> Vec<segment::Predicate<'a>> {
vec![
(
segment::TIME_COLUMN_NAME,
(
column::cmp::Operator::GTE,
column::Value::Scalar(column::Scalar::I64(from)),
),
),
(
segment::TIME_COLUMN_NAME,
(
column::cmp::Operator::LT,
column::Value::Scalar(column::Scalar::I64(to)),
),
),
]
}
// A database is scoped to a single tenant. Within a database there exists
// tables for measurements. There is a 1:1 mapping between a table and a
// measurement name.

View File

@ -51,7 +51,7 @@ impl Partition {
time_range: (i64, i64),
predicates: &[(&str, &str)],
select_columns: Vec<ColumnName<'_>>,
) -> BTreeMap<ColumnName<'_>, Values> {
) -> BTreeMap<ColumnName<'_>, Values<'_>> {
// Find the measurement name on the partition and dispatch query to the
// table for that measurement if the partition's time range overlaps the
// requested time range.
@ -79,7 +79,7 @@ impl Partition {
predicates: &[(&str, &str)],
group_columns: Vec<ColumnName<'_>>,
aggregates: Vec<(ColumnName<'_>, AggregateType)>,
) -> BTreeMap<GroupKey, Vec<(ColumnName<'_>, AggregateResult<'_>)>> {
) -> BTreeMap<GroupKey<'_>, Vec<(ColumnName<'_>, AggregateResult<'_>)>> {
// Find the measurement name on the partition and dispatch query to the
// table for that measurement if the partition's time range overlaps the
// requested time range.

View File

@ -3,7 +3,8 @@ use std::{borrow::Cow, collections::BTreeMap};
use arrow_deps::arrow::datatypes::SchemaRef;
use crate::column::{
cmp::Operator, Column, OwnedValue, RowIDs, RowIDsOption, Scalar, Value, Values, ValuesIterator,
cmp::Operator, AggregateResult, AggregateType, Column, OwnedValue, RowIDs, RowIDsOption,
Scalar, Value, Values, ValuesIterator,
};
/// The name used for a timestamp column.
@ -129,11 +130,8 @@ impl Segment {
}
// Returns a reference to a column from the column name.
fn column_by_name(&self, name: ColumnName<'_>) -> Option<&Column> {
match self.all_columns_by_name.get(name) {
Some(&idx) => Some(&self.columns[idx]),
None => None,
}
fn column_by_name(&self, name: ColumnName<'_>) -> &Column {
&self.columns[*self.all_columns_by_name.get(name).unwrap()]
}
// Returns a reference to the timestamp column.
@ -166,19 +164,19 @@ impl Segment {
///
/// Right now, predicates are conjunctive (AND).
pub fn read_filter<'a>(
&self,
&'a self,
columns: &[ColumnName<'a>],
predicates: &[Predicate<'_>],
) -> ReadFilterResult<'a> {
) -> ReadFilterResult<'_> {
let row_ids = self.row_ids_from_predicates(predicates);
ReadFilterResult(self.materialise_rows(columns, row_ids))
}
fn materialise_rows<'a>(
&self,
&'a self,
columns: &[ColumnName<'a>],
row_ids: RowIDsOption,
) -> Vec<(ColumnName<'a>, Values)> {
) -> Vec<(ColumnName<'_>, Values<'_>)> {
let mut results = vec![];
match row_ids {
RowIDsOption::None(_) => results, // nothing to materialise
@ -187,7 +185,7 @@ impl Segment {
// buffer to the croaring Bitmap API.
let row_ids = row_ids.to_vec();
for &col_name in columns {
let col = self.column_by_name(col_name).unwrap();
let col = self.column_by_name(col_name);
results.push((col_name, col.values(row_ids.as_slice())));
}
results
@ -200,7 +198,7 @@ impl Segment {
let row_ids = (0..self.rows()).collect::<Vec<_>>();
for &col_name in columns {
let col = self.column_by_name(col_name).unwrap();
let col = self.column_by_name(col_name);
results.push((col_name, col.values(row_ids.as_slice())));
}
results
@ -264,8 +262,8 @@ impl Segment {
for (col_name, (op, value)) in predicates.iter() {
// N.B column should always exist because validation of
// predicates is not the responsibility of the `Segment`.
let col = self.column_by_name(col_name).unwrap();
// predicates should happen at the `Table` level.
let col = self.column_by_name(col_name);
// Explanation of how this buffer pattern works. The idea is
// that the buffer should be returned to the caller so it can be
@ -323,13 +321,66 @@ impl Segment {
dst,
)
}
/// Returns a set of group keys and aggregated column data associated with
/// them.
///
/// Right now, predicates are conjunctive (AND).
pub fn read_group<'a>(
&'a self,
predicates: &[Predicate<'a>],
group_columns: &[ColumnName<'a>],
aggregates: &[(ColumnName<'a>, AggregateType)],
) -> ReadGroupResult<'a> {
ReadGroupResult::default()
}
// Optimised read group method when there are no predicates and all the group
// columns are RLE-encoded.
//
// In this case all the grouping columns pre-computed bitsets for each
// distinct value.
fn read_group_all_rows_all_rle<'a>(
&'a self,
group_columns: &[ColumnName<'a>],
aggregates: &[(ColumnName<'a>, AggregateType)],
) -> ReadGroupResult<'a> {
todo!()
}
// Optimised read group method where only a single column is being used as
// the group key.
//
// In this case the groups can be represented by a single integer key.
fn read_group_single_group_column(
&self,
predicates: &[Predicate<'_>],
group_column: ColumnName<'_>,
aggregates: &[(ColumnName<'_>, AggregateType)],
) -> ReadGroupResult<'_> {
todo!()
}
// Optimised read group method where all the segment column sort covers all
// the columns being grouped such that the required rows are totally ordered.
//
// In this case the rows are already in "group key order" and the aggregates
// can be calculated by reading the rows in order.
fn read_group_sorted_stream(
&self,
predicates: &[Predicate<'_>],
group_column: ColumnName<'_>,
aggregates: &[(ColumnName<'_>, AggregateType)],
) -> ReadGroupResult<'_> {
todo!()
}
}
pub type Predicate<'a> = (ColumnName<'a>, (Operator, Value<'a>));
// A GroupKey is an ordered collection of row values. The order determines which
// columns the values originated from.
pub type GroupKey = Vec<String>;
pub type GroupKey<'a> = Vec<Value<'a>>;
// A representation of a column name.
pub type ColumnName<'a> = &'a str;
@ -436,7 +487,7 @@ impl MetaData {
/// Encapsulates results from segments with a structure that makes them easier
/// to work with and display.
pub struct ReadFilterResult<'a>(pub Vec<(ColumnName<'a>, Values)>);
pub struct ReadFilterResult<'a>(pub Vec<(ColumnName<'a>, Values<'a>)>);
impl<'a> ReadFilterResult<'a> {
pub fn is_empty(&self) -> bool {
@ -684,6 +735,45 @@ west,4
assert!(results.is_empty());
}
#[test]
#[ignore]
fn read_group() {
let mut columns = BTreeMap::new();
let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..]));
columns.insert("time".to_string(), tc);
let rc = ColumnType::Tag(Column::from(
&["west", "west", "east", "west", "south", "north"][..],
));
columns.insert("region".to_string(), rc);
let mc = ColumnType::Tag(Column::from(
&["GET", "POST", "POST", "POST", "PUT", "GET"][..],
));
columns.insert("method".to_string(), mc);
let fc = ColumnType::Field(Column::from(&[100_u64, 101, 200, 203, 203, 10][..]));
columns.insert("count".to_string(), fc);
let segment = Segment::new(6, columns);
let cases = vec![(
build_predicates_with_time(1, 6, vec![]),
vec!["region", "method"],
vec![("count", AggregateType::Sum)],
"region,method,count_sum
west,GET,100
west,POST,304
east,POST,200
south,PUT,203",
)];
for (predicate, group_cols, aggs, expected) in cases {
let results = segment.read_group(&predicate, &group_cols, &aggs);
assert_eq!(format!("{}", results), expected);
}
}
#[test]
fn segment_could_satisfy_predicate() {
let mut columns = BTreeMap::new();
@ -742,4 +832,47 @@ west,4
);
}
}
#[test]
fn read_group_result_display() {
let result = ReadGroupResult {
group_keys: vec![
vec![Value::String("east"), Value::String("host-a")],
vec![Value::String("east"), Value::String("host-b")],
vec![Value::String("west"), Value::String("host-a")],
vec![Value::String("west"), Value::String("host-c")],
vec![Value::String("west"), Value::String("host-d")],
],
aggregates: vec![
vec![
AggregateResult::Sum(Scalar::I64(10)),
AggregateResult::Count(3),
],
vec![
AggregateResult::Sum(Scalar::I64(20)),
AggregateResult::Count(4),
],
vec![
AggregateResult::Sum(Scalar::I64(25)),
AggregateResult::Count(3),
],
vec![
AggregateResult::Sum(Scalar::I64(21)),
AggregateResult::Count(1),
],
vec![
AggregateResult::Sum(Scalar::I64(11)),
AggregateResult::Count(9),
],
],
};
let expected = "east,host-a,10,3
east,host-b,20,4
west,host-a,25,3
west,host-c,21,1
west,host-d,11,9";
assert_eq!(format!("{}", result), expected);
}
}

View File

@ -96,6 +96,16 @@ impl Table {
todo!()
}
// Determines if schema contains all the provided column names.
fn has_all_columns(&self, names: &[ColumnName<'_>]) -> bool {
for &name in names {
if !self.meta.column_ranges.contains_key(name) {
return false;
}
}
true
}
// Identify set of segments that may satisfy the predicates.
fn filter_segments(&self, predicates: &[Predicate<'_>]) -> Vec<&Segment> {
let mut segments = Vec::with_capacity(self.segments.len());
@ -123,7 +133,7 @@ impl Table {
/// since the epoch. Results are included if they satisfy the predicate and
/// fall with the [min, max) time range domain.
pub fn select<'a>(
&self,
&'a self,
columns: &[ColumnName<'a>],
predicates: &[Predicate<'_>],
) -> ReadFilterResults<'a> {
@ -164,16 +174,43 @@ impl Table {
/// and the type of aggregation required. Multiple aggregations can be
/// applied to the same column.
pub fn aggregate<'a>(
&self,
time_range: (i64, i64),
predicates: &[(&str, &str)],
&'a self,
predicates: &[Predicate<'a>],
group_columns: Vec<ColumnName<'a>>,
aggregates: Vec<(ColumnName<'a>, AggregateType)>,
) -> BTreeMap<GroupKey, Vec<(ColumnName<'a>, AggregateResult<'_>)>> {
) -> ReadGroupResult<'a> {
if !self.has_all_columns(&group_columns) {
return ReadGroupResult::default(); //TODO(edd): return an error here "group key column x not found"
}
if !self.has_all_columns(&aggregates.iter().map(|(name, _)| *name).collect::<Vec<_>>()) {
return ReadGroupResult::default(); //TODO(edd): return an error here "aggregate column x not found"
}
if !self.has_all_columns(&predicates.iter().map(|(name, _)| *name).collect::<Vec<_>>()) {
return ReadGroupResult::default(); //TODO(edd): return an error here "predicate column x not found"
}
// identify segments where time range and predicates match could match
// using segment meta data, and then execute against those segments and
// merge results.
self.aggregate_window(time_range, predicates, group_columns, aggregates, 0)
let segments = self.filter_segments(predicates);
if segments.is_empty() {
return ReadGroupResult::default();
}
let mut results = Vec::with_capacity(segments.len());
for segment in segments {
let segment_result = segment.read_group(predicates, &group_columns, &aggregates);
results.push(segment_result);
}
// TODO(edd): merge results across segments
ReadGroupResult {
groupby_columns: group_columns,
aggregate_columns: aggregates,
..ReadGroupResult::default()
}
}
/// Returns aggregates segmented by grouping keys and windowed by time.
@ -201,7 +238,7 @@ impl Table {
group_columns: Vec<ColumnName<'a>>,
aggregates: Vec<(ColumnName<'a>, AggregateType)>,
window: i64,
) -> BTreeMap<GroupKey, Vec<(ColumnName<'a>, AggregateResult<'_>)>> {
) -> BTreeMap<GroupKey<'_>, Vec<(ColumnName<'a>, AggregateResult<'_>)>> {
// identify segments where time range and predicates match could match
// using segment meta data, and then execute against those segments and
// merge results.
@ -253,10 +290,12 @@ impl Table {
));
}
AggregateType::Sum => {
results.push((
col_name,
AggregateResult::Sum(self.sum(col_name, time_range)),
));
let res = match self.sum(col_name, time_range) {
Some(x) => x,
None => Scalar::Null,
};
results.push((col_name, AggregateResult::Sum(res)));
}
}
}
@ -508,6 +547,70 @@ impl<'a> Display for ReadFilterResults<'a> {
}
}
#[derive(Default)]
pub struct ReadGroupResult<'a> {
// column-wise collection of columns being grouped by
groupby_columns: Vec<ColumnName<'a>>,
// column-wise collection of columns being aggregated on
aggregate_columns: Vec<(ColumnName<'a>, AggregateType)>,
// row-wise collection of group keys. Each group key contains column-wise
// values for each of the groupby_columns.
group_keys: Vec<GroupKey<'a>>,
// row-wise collection of aggregates. Each aggregate contains column-wise
// values for each of the aggregate_columns.
aggregates: Vec<Vec<AggregateResult<'a>>>,
}
use std::iter::Iterator;
impl<'a> std::fmt::Display for ReadGroupResult<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// header line - display group columns first
for (i, name) in self.groupby_columns.iter().enumerate() {
write!(f, "{},", name)?;
}
// then display aggregate columns
for (i, (col_name, col_agg)) in self.aggregate_columns.iter().enumerate() {
write!(f, "{}_{}", col_name, col_agg)?;
if i < self.aggregate_columns.len() - 1 {
write!(f, ",")?;
}
}
writeln!(f)?;
// TODO: handle empty results?
let expected_rows = self.group_keys.len();
let mut row = 0;
while row < expected_rows {
if row > 0 {
writeln!(f)?;
}
// write row for group by columns
for value in &self.group_keys[row] {
write!(f, "{},", value)?;
}
// write row for aggregate columns
for (col_i, agg) in self.aggregates[row].iter().enumerate() {
write!(f, "{}", agg)?;
if col_i < self.aggregates[row].len() - 1 {
write!(f, ",")?;
}
}
row += 1;
}
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;