refactor: wire up sum

pull/24376/head
Edd Robinson 2021-01-30 22:42:58 +00:00
parent fd25b6a9e2
commit 9cc9c714ed
3 changed files with 73 additions and 34 deletions

View File

@ -292,6 +292,15 @@ impl Database {
group_columns: Selection<'input>,
aggregates: Vec<(ColumnName<'input>, AggregateType)>,
) -> Result<ReadAggregateResults> {
for (_, agg) in &aggregates {
match agg {
AggregateType::First | AggregateType::Last => {
return Err(Error::UnsupportedAggregate { agg: *agg });
}
_ => {}
}
}
// get read lock on database
let partition_data = self.data.read().unwrap();
let mut chunk_table_results = vec![];
@ -322,15 +331,6 @@ impl Database {
}
}
for (_, agg) in &aggregates {
match agg {
AggregateType::First | AggregateType::Last => {
return Err(Error::UnsupportedAggregate { agg: *agg });
}
_ => {}
}
}
Ok(ReadAggregateResults::new(chunk_table_results))
}
@ -1123,7 +1123,7 @@ mod test {
}
#[test]
fn read_aggregate_multiple_row_groups() {
fn read_aggregate() {
let mut db = Database::new();
// Add a bunch of row groups to a single table in a single chunks
@ -1142,17 +1142,49 @@ mod test {
Arc::new(StringArray::from(vec!["west", "west", "east"])),
Arc::new(Float64Array::from(vec![10.0, 30000.0, 4500.0])),
Arc::new(UInt64Array::from(vec![1000, 3000, 5000])),
Arc::new(Int64Array::from(vec![i, 20 * i, 30 * i])),
Arc::new(Int64Array::from(vec![i, 20 + i, 30 + i])),
];
// Add a record batch to a single partition
let rb = RecordBatch::try_new(schema.into(), data).unwrap();
println!("rb {:?} {:?}", i, &rb);
// The row group gets added to the same chunk each time.
db.upsert_partition("hour_1", 1, "table1", rb);
}
// Build the following query:
//
// Simple Aggregates - no group keys.
//
// QUERY:
//
// SELECT SUM("counter"), COUNT("counter")
// FROM "table_1"
// WHERE "time" <= 130
//
let itr = db
.read_aggregate(
"hour_1",
"table1",
&[1],
Predicate::new(vec![BinaryExpr::from(("time", "<=", 130_i64))]),
Selection::Some(&[]),
vec![
("counter", AggregateType::Count),
("counter", AggregateType::Sum),
],
)
.unwrap();
let result = itr.collect::<Vec<RecordBatch>>();
assert_eq!(result.len(), 1);
let result = &result[0];
assert_rb_column_equals(&result, "counter_count", &Values::U64(vec![3]));
assert_rb_column_equals(&result, "counter_sum", &Values::U64(vec![9000]));
//
// With group keys
//
// QUERY:
//
// SELECT SUM("temp"), MIN("temp"), SUM("counter"), COUNT("counter")
// FROM "table_1"

View File

@ -894,21 +894,27 @@ impl RowGroup {
false => match self.row_ids_from_predicate(predicate) {
RowIDsOption::Some(row_ids) => row_ids.to_vec(),
RowIDsOption::None(_) => vec![],
RowIDsOption::All(row_ids) => row_ids.to_vec(),
RowIDsOption::All(row_ids) => {
// see above comment.
(0..self.rows()).into_iter().collect::<Vec<u32>>()
}
},
};
// the single row that will store the aggregate column values.
let mut aggregate_row = vec![];
for (col, agg_type) in aggregate_columns {
match agg_type {
AggregateType::Count => {
dst.aggregates
.push(AggregateResults(vec![AggregateResult::Count(
col.count(&row_ids) as u64,
)]));
aggregate_row.push(AggregateResult::Count(col.count(&row_ids) as u64));
}
AggregateType::Sum => {
aggregate_row.push(AggregateResult::Sum(col.sum(&row_ids)));
}
_ => todo!(),
}
}
dst.aggregates.push(AggregateResults(aggregate_row)); // write the row
}
}

View File

@ -855,12 +855,6 @@ impl Iterator for ReadAggregateResults {
// Helper type that can pretty print a set of results for `read_aggregate`.
struct DisplayReadAggregateResults<'a>(Vec<row_group::ReadAggregateResult<'a>>);
// impl DisplayReadAggregateResults<'_> {
// fn to_string(&self) -> String {
// format!("{}", &self)
// }
// }
impl std::fmt::Display for DisplayReadAggregateResults<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.0.is_empty() {
@ -1148,7 +1142,7 @@ mod test {
// Build another row group.
let mut columns = BTreeMap::new();
columns.insert("time".to_string(), ColumnType::create_time(&[2, 2, 2]));
columns.insert("time".to_string(), ColumnType::create_time(&[2, 3, 4]));
columns.insert(
"region".to_string(),
ColumnType::create_tag(&["north", "north", "north"]),
@ -1160,23 +1154,30 @@ mod test {
let mut results = table.read_aggregate(
Predicate::default(),
&Selection::Some(&[]),
&[("time", AggregateType::Count)],
&[("time", AggregateType::Count), ("time", AggregateType::Sum)],
);
// check the column result schema
let exp_schema = ResultSchema {
aggregate_columns: vec![(
schema::ColumnType::Timestamp("time".to_owned()),
AggregateType::Count,
LogicalDataType::Integer,
)],
aggregate_columns: vec![
(
schema::ColumnType::Timestamp("time".to_owned()),
AggregateType::Count,
LogicalDataType::Integer,
),
(
schema::ColumnType::Timestamp("time".to_owned()),
AggregateType::Sum,
LogicalDataType::Integer,
),
],
..ResultSchema::default()
};
assert_eq!(results.schema(), &exp_schema);
assert_eq!(
DisplayReadAggregateResults(vec![results.next_merged_result().unwrap()]).to_string(),
"time_count\n6\n",
"time_count,time_sum\n6,609\n",
);
assert!(matches!(results.next_merged_result(), None));
@ -1184,12 +1185,12 @@ mod test {
let mut results = table.read_aggregate(
Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]),
&Selection::Some(&[]),
&[("time", AggregateType::Count)],
&[("time", AggregateType::Count), ("time", AggregateType::Sum)],
);
assert_eq!(
DisplayReadAggregateResults(vec![results.next_merged_result().unwrap()]).to_string(),
"time_count\n2\n",
"time_count,time_sum\n2,300\n",
);
assert!(matches!(results.next_merged_result(), None));
}