refactor: move AggregateType and push aggregates down

pull/24376/head
Edd Robinson 2020-08-26 21:51:46 +01:00
parent d1f9ca3acf
commit f588b9ff61
4 changed files with 97 additions and 36 deletions

View File

@ -12,8 +12,8 @@ use arrow::record_batch::{RecordBatch, RecordBatchReader};
use arrow::{array, array::Array, datatypes, ipc};
use delorean_mem_qe::column;
use delorean_mem_qe::column::Column;
use delorean_mem_qe::segment::{Aggregate, GroupingStrategy, Schema, Segment};
use delorean_mem_qe::column::{AggregateType, Column};
use delorean_mem_qe::segment::{GroupingStrategy, Schema, Segment};
use delorean_mem_qe::{adapter::DeloreanQueryEngine, Store};
use parquet::arrow::arrow_reader::ArrowReader;
@ -63,13 +63,13 @@ fn main() {
);
let store = Arc::new(store);
time_select_with_pred(&store);
time_datafusion_select_with_pred(store.clone());
time_first_host(&store);
time_sum_range(&store);
time_count_range(&store);
time_group_single_with_pred(&store);
time_group_by_multi_agg_count(&store);
// time_select_with_pred(&store);
// time_datafusion_select_with_pred(store.clone());
// time_first_host(&store);
// time_sum_range(&store);
// time_count_range(&store);
// time_group_single_with_pred(&store);
// time_group_by_multi_agg_count(&store);
time_group_by_multi_agg_sorted_count(&store);
}
@ -113,7 +113,7 @@ fn build_store(
) -> Result<(), Error> {
let mut total_rows_read = 0;
let start = std::time::Instant::now();
// let mut i = 0;
let mut i = 0;
loop {
let rb = reader.next_batch();
match rb {
@ -441,7 +441,7 @@ fn time_group_single_with_pred(store: &Store) {
(1588834080000000, 1590044410000000),
&[],
&"env".to_string(),
&vec![("counter".to_string(), Aggregate::Count)],
&vec![("counter".to_string(), AggregateType::Count)],
);
track += results.len();
}
@ -457,6 +457,12 @@ fn time_group_single_with_pred(store: &Store) {
);
}
//
// SELECT COUNT(counter)
// FROM measurement
// WHERE time >= "2020-05-21 04:41:50" AND time < "2020-05-21 05:59:30"
// GROUP BY "status", "method"
//
fn time_group_by_multi_agg_count(store: &Store) {
let strats = vec![
GroupingStrategy::HashGroup,
@ -477,7 +483,7 @@ fn time_group_by_multi_agg_count(store: &Store) {
(1589000000000001, 1590044410000000),
&[],
vec!["status".to_string(), "method".to_string()],
vec![("counter".to_string(), Aggregate::Count)],
vec![("counter".to_string(), AggregateType::Count)],
strat,
);
@ -495,16 +501,22 @@ fn time_group_by_multi_agg_count(store: &Store) {
}
}
//
// SELECT COUNT(counter)
// FROM measurement
// WHERE time >= "2020-05-21 04:41:50" AND time < "2020-05-21 05:59:30"
// GROUP BY "env", "role"
//
fn time_group_by_multi_agg_sorted_count(store: &Store) {
let strats = vec![
GroupingStrategy::HashGroup,
GroupingStrategy::HashGroupConcurrent,
// GroupingStrategy::HashGroup,
// GroupingStrategy::HashGroupConcurrent,
GroupingStrategy::SortGroup,
GroupingStrategy::SortGroupConcurrent,
// GroupingStrategy::SortGroupConcurrent,
];
for strat in &strats {
let repeat = 10000;
let repeat = 10;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let mut total_max = 0;
let segments = store.segments();
@ -515,7 +527,7 @@ fn time_group_by_multi_agg_sorted_count(store: &Store) {
(1589000000000001, 1590044410000000),
&[],
vec!["env".to_string(), "role".to_string()],
vec![("counter".to_string(), Aggregate::Count)],
vec![("counter".to_string(), AggregateType::Count)],
strat,
);

View File

@ -103,6 +103,12 @@ pub enum Aggregate<'a> {
Sum(Scalar<'a>),
}
#[derive(Debug, Clone)]
pub enum AggregateType {
Count,
Sum,
}
impl<'a> Aggregate<'a> {
pub fn update_with(&mut self, other: Scalar<'a>) {
match self {
@ -319,8 +325,8 @@ impl Column {
}
}
/// Materialise all of the decoded values matching the provided logical
/// row ids.
/// Materialise the decoded value matching the provided logical
/// row id.
pub fn value(&self, row_id: usize) -> Option<Scalar> {
match self {
Column::String(c) => {
@ -726,6 +732,27 @@ impl Column {
}
}
pub fn aggregate_by_id_range(
&self,
agg_type: &AggregateType,
from_row_id: usize,
to_row_id: usize,
) -> Aggregate {
match self {
Column::String(_) => unimplemented!("not implemented"),
Column::Float(c) => match agg_type {
AggregateType::Count => {
Aggregate::Count(c.count_by_id_range(from_row_id, to_row_id) as u64)
}
AggregateType::Sum => {
Aggregate::Sum(Scalar::Float(c.sum_by_id_range(from_row_id, to_row_id)))
}
},
Column::Integer(_) => unimplemented!("not implemented"),
}
}
pub fn group_by_ids(&self) -> &std::collections::BTreeMap<u32, croaring::Bitmap> {
match self {
Column::String(c) => c.data.group_row_ids(),
@ -977,6 +1004,14 @@ impl Float {
pub fn sum_by_ids(&self, row_ids: &mut croaring::Bitmap) -> f64 {
self.data.sum_by_ids(row_ids)
}
pub fn sum_by_id_range(&self, from_row_id: usize, to_row_id: usize) -> f64 {
self.data.sum_by_id_range(from_row_id, to_row_id)
}
pub fn count_by_id_range(&self, from_row_id: usize, to_row_id: usize) -> usize {
self.data.count_by_id_range(from_row_id, to_row_id)
}
}
impl From<&[f64]> for Float {

View File

@ -156,6 +156,18 @@ where
bm
}
pub fn sum_by_id_range(&self, from_row_id: usize, to_row_id: usize) -> T {
let mut res = T::default();
for v in self.values[from_row_id..to_row_id].iter() {
res += *v;
}
res
}
pub fn count_by_id_range(&self, from_row_id: usize, to_row_id: usize) -> usize {
to_row_id - from_row_id
}
// TODO(edd): make faster
pub fn sum_by_ids(&self, row_ids: &mut croaring::Bitmap) -> T {
let mut res = T::default();

View File

@ -1,7 +1,7 @@
use std::collections::{BTreeMap, BTreeSet, HashMap};
use super::column;
use super::column::Column;
use super::column::{AggregateType, Column};
use arrow::datatypes::SchemaRef;
// Only used in a couple of specific places for experimentation.
@ -226,7 +226,7 @@ impl Segment {
time_range: (i64, i64),
predicates: &[(&str, Option<&column::Scalar>)],
group_columns: &[String],
aggregates: &[(String, Aggregate)],
aggregates: &[(String, AggregateType)],
) -> BTreeMap<Vec<String>, Vec<(String, Option<column::Aggregate>)>> {
// println!("working segment {:?}", time_range);
// Build a hash table - essentially, scan columns for matching row ids,
@ -323,7 +323,7 @@ impl Segment {
let mut hash_table: HashMap<
Vec<Option<&i64>>,
Vec<(&String, &Aggregate, Option<column::Aggregate>)>,
Vec<(&String, &AggregateType, Option<column::Aggregate>)>,
> = HashMap::with_capacity(30000);
let mut aggregate_row: Vec<(&str, Option<column::Scalar>)> =
@ -361,7 +361,7 @@ impl Segment {
// a place-holder for each aggregate being executed.
let group_key_entry = hash_table.entry(group_row).or_insert_with(|| {
// TODO COULD BE MAP/COLLECT
let mut agg_results: Vec<(&String, &Aggregate, Option<column::Aggregate>)> =
let mut agg_results: Vec<(&String, &AggregateType, Option<column::Aggregate>)> =
Vec::with_capacity(aggregates.len());
for (col_name, agg_type) in aggregates {
agg_results.push((col_name, agg_type, None)); // switch out Aggregate for Option<column::Aggregate>
@ -396,8 +396,10 @@ impl Segment {
},
None => {
*cum_agg_value = match agg_type {
Aggregate::Count => Some(column::Aggregate::Count(0)),
Aggregate::Sum => Some(column::Aggregate::Sum(row_value.clone())),
AggregateType::Count => Some(column::Aggregate::Count(0)),
AggregateType::Sum => {
Some(column::Aggregate::Sum(row_value.clone()))
}
}
}
}
@ -414,7 +416,7 @@ impl Segment {
time_range: (i64, i64),
predicates: &[(&str, Option<&column::Scalar>)],
group_columns: &[String],
aggregates: &[(String, Aggregate)],
aggregates: &[(String, AggregateType)],
) -> BTreeMap<Vec<&i64>, Vec<(String, column::Aggregate)>> {
// filter on predicates and time
let filtered_row_ids: croaring::Bitmap;
@ -536,8 +538,8 @@ impl Segment {
.zip(last_agg_row.iter())
.map(|((col_name, agg_type), curr_agg)| {
let agg = match agg_type {
Aggregate::Count => column::Aggregate::Count(1),
Aggregate::Sum => column::Aggregate::Sum(curr_agg.clone()),
AggregateType::Count => column::Aggregate::Count(1),
AggregateType::Sum => column::Aggregate::Sum(curr_agg.clone()),
};
(col_name.clone(), agg)
})
@ -717,8 +719,8 @@ impl Segment {
time_range: (i64, i64),
predicates: &[(&str, Option<&column::Scalar>)],
group_column: &String,
aggregates: &Vec<(String, Aggregate)>,
) -> BTreeMap<u32, Vec<((String, Aggregate), column::Aggregate)>> {
aggregates: &Vec<(String, column::AggregateType)>,
) -> BTreeMap<u32, Vec<((String, AggregateType), column::Aggregate)>> {
let mut grouped_results = BTreeMap::new();
let filter_row_ids: croaring::Bitmap;
@ -734,12 +736,12 @@ impl Segment {
let mut filtered_row_ids = row_ids.and(&filter_row_ids);
if !filtered_row_ids.is_empty() {
// First calculate all of the aggregates for this grouped value
let mut aggs: Vec<((String, Aggregate), column::Aggregate)> =
let mut aggs: Vec<((String, AggregateType), column::Aggregate)> =
Vec::with_capacity(aggregates.len());
for (col_name, agg) in aggregates {
match &agg {
Aggregate::Sum => {
AggregateType::Sum => {
aggs.push((
(col_name.to_string(), agg.clone()),
column::Aggregate::Sum(
@ -747,7 +749,7 @@ impl Segment {
), // assuming no non-null group keys
));
}
Aggregate::Count => {
AggregateType::Count => {
aggs.push((
(col_name.to_string(), agg.clone()),
column::Aggregate::Count(
@ -898,7 +900,7 @@ impl<'a> Segments<'a> {
time_range: (i64, i64),
predicates: &[(&str, Option<&column::Scalar>)],
group_columns: Vec<String>,
aggregates: Vec<(String, Aggregate)>,
aggregates: Vec<(String, AggregateType)>,
strategy: &GroupingStrategy,
) -> BTreeMap<Vec<String>, Vec<((String, Aggregate), column::Aggregate)>> {
let (min, max) = time_range;
@ -957,7 +959,7 @@ impl<'a> Segments<'a> {
time_range: (i64, i64),
predicates: &[(&str, Option<&column::Scalar>)],
group_columns: Vec<String>,
aggregates: Vec<(String, Aggregate)>,
aggregates: Vec<(String, AggregateType)>,
concurrent: bool,
) -> BTreeMap<Vec<String>, Vec<((String, Aggregate), column::Aggregate)>> {
if concurrent {
@ -1034,7 +1036,7 @@ impl<'a> Segments<'a> {
time_range: (i64, i64),
predicates: &[(&str, Option<&column::Scalar>)],
group_columns: Vec<String>,
aggregates: Vec<(String, Aggregate)>,
aggregates: Vec<(String, AggregateType)>,
concurrent: bool,
) -> BTreeMap<Vec<String>, Vec<((String, Aggregate), column::Aggregate)>> {
if concurrent {