refactor: return error for wrong group columns
parent
cadc92dac9
commit
585213e51f
|
@ -4,7 +4,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use internal_types::selection::Selection;
|
use internal_types::selection::Selection;
|
||||||
use snafu::{ResultExt, Snafu};
|
use snafu::{OptionExt, ResultExt, Snafu};
|
||||||
|
|
||||||
use crate::row_group::RowGroup;
|
use crate::row_group::RowGroup;
|
||||||
use crate::row_group::{ColumnName, Predicate};
|
use crate::row_group::{ColumnName, Predicate};
|
||||||
|
@ -184,9 +184,7 @@ impl Chunk {
|
||||||
let table = chunk_data
|
let table = chunk_data
|
||||||
.data
|
.data
|
||||||
.get(table_name)
|
.get(table_name)
|
||||||
.ok_or(Error::TableNotFound {
|
.context(TableNotFound { table_name })?;
|
||||||
table_name: table_name.to_owned(),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(table.read_filter(select_columns, predicate))
|
Ok(table.read_filter(select_columns, predicate))
|
||||||
}
|
}
|
||||||
|
@ -195,7 +193,7 @@ impl Chunk {
|
||||||
/// columns, optionally filtered by the provided predicate. Results are
|
/// columns, optionally filtered by the provided predicate. Results are
|
||||||
/// merged across all row groups within the returned table.
|
/// merged across all row groups within the returned table.
|
||||||
///
|
///
|
||||||
/// Returns `None` if the table no longer exists within the chunk.
|
/// Returns an error if the specified table does not exist.
|
||||||
///
|
///
|
||||||
/// Note: `read_aggregate` currently only supports grouping on "tag"
|
/// Note: `read_aggregate` currently only supports grouping on "tag"
|
||||||
/// columns.
|
/// columns.
|
||||||
|
@ -205,17 +203,18 @@ impl Chunk {
|
||||||
predicate: Predicate,
|
predicate: Predicate,
|
||||||
group_columns: &Selection<'_>,
|
group_columns: &Selection<'_>,
|
||||||
aggregates: &[(ColumnName<'_>, AggregateType)],
|
aggregates: &[(ColumnName<'_>, AggregateType)],
|
||||||
) -> Option<table::ReadAggregateResults> {
|
) -> Result<table::ReadAggregateResults> {
|
||||||
// read lock on chunk.
|
// read lock on chunk.
|
||||||
let chunk_data = self.chunk_data.read().unwrap();
|
let chunk_data = self.chunk_data.read().unwrap();
|
||||||
|
|
||||||
// Lookup table by name and dispatch execution.
|
let table = chunk_data
|
||||||
//
|
|
||||||
// TODO(edd): this should return an error
|
|
||||||
chunk_data
|
|
||||||
.data
|
.data
|
||||||
.get(table_name)
|
.get(table_name)
|
||||||
.map(|table| table.read_aggregate(predicate, group_columns, aggregates))
|
.context(TableNotFound { table_name })?;
|
||||||
|
|
||||||
|
table
|
||||||
|
.read_aggregate(predicate, group_columns, aggregates)
|
||||||
|
.context(TableError)
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
|
|
|
@ -363,12 +363,12 @@ impl Database {
|
||||||
// Get all relevant row groups for this chunk's table. This
|
// Get all relevant row groups for this chunk's table. This
|
||||||
// is cheap because it doesn't execute the read operation,
|
// is cheap because it doesn't execute the read operation,
|
||||||
// but just gets references to the needed to data to do so.
|
// but just gets references to the needed to data to do so.
|
||||||
if let Some(table_results) =
|
let table_results = chunk
|
||||||
chunk.read_aggregate(table_name, predicate.clone(), &group_columns, &aggregates)
|
.read_aggregate(table_name, predicate.clone(), &group_columns, &aggregates)
|
||||||
{
|
.context(ChunkError)?;
|
||||||
|
|
||||||
chunk_table_results.push(table_results);
|
chunk_table_results.push(table_results);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
Ok(ReadAggregateResults::new(chunk_table_results))
|
Ok(ReadAggregateResults::new(chunk_table_results))
|
||||||
}
|
}
|
||||||
|
|
|
@ -228,7 +228,7 @@ impl Table {
|
||||||
predicate: Predicate,
|
predicate: Predicate,
|
||||||
group_columns: &'input Selection<'_>,
|
group_columns: &'input Selection<'_>,
|
||||||
aggregates: &'input [(ColumnName<'input>, AggregateType)],
|
aggregates: &'input [(ColumnName<'input>, AggregateType)],
|
||||||
) -> ReadAggregateResults {
|
) -> Result<ReadAggregateResults> {
|
||||||
let (meta, row_groups) = self.filter_row_groups(&predicate);
|
let (meta, row_groups) = self.filter_row_groups(&predicate);
|
||||||
|
|
||||||
// Filter out any column names that we do not have data for.
|
// Filter out any column names that we do not have data for.
|
||||||
|
@ -241,13 +241,24 @@ impl Table {
|
||||||
..ResultSchema::default()
|
..ResultSchema::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Check all grouping columns are valid for grouping operation.
|
||||||
|
for (ct, _) in &schema.group_columns {
|
||||||
|
ensure!(
|
||||||
|
matches!(ct, ColumnType::Tag(_)),
|
||||||
|
UnsupportedColumnOperation {
|
||||||
|
msg: format!("column type must be ColumnType::Tag, got {:?}", ct),
|
||||||
|
column_name: ct.as_str().to_string(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// return the iterator to build the results.
|
// return the iterator to build the results.
|
||||||
ReadAggregateResults {
|
Ok(ReadAggregateResults {
|
||||||
schema,
|
schema,
|
||||||
predicate,
|
predicate,
|
||||||
row_groups,
|
row_groups,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns aggregates segmented by grouping keys and windowed by time.
|
/// Returns aggregates segmented by grouping keys and windowed by time.
|
||||||
|
@ -1064,11 +1075,13 @@ mod test {
|
||||||
table.add_row_group(rg);
|
table.add_row_group(rg);
|
||||||
|
|
||||||
// no predicate aggregate
|
// no predicate aggregate
|
||||||
let mut results = table.read_aggregate(
|
let mut results = table
|
||||||
|
.read_aggregate(
|
||||||
Predicate::default(),
|
Predicate::default(),
|
||||||
&Selection::Some(&[]),
|
&Selection::Some(&[]),
|
||||||
&[("time", AggregateType::Count), ("time", AggregateType::Sum)],
|
&[("time", AggregateType::Count), ("time", AggregateType::Sum)],
|
||||||
);
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// check the column result schema
|
// check the column result schema
|
||||||
let exp_schema = ResultSchema {
|
let exp_schema = ResultSchema {
|
||||||
|
@ -1095,17 +1108,31 @@ mod test {
|
||||||
assert!(matches!(results.next_merged_result(), None));
|
assert!(matches!(results.next_merged_result(), None));
|
||||||
|
|
||||||
// apply a predicate
|
// apply a predicate
|
||||||
let mut results = table.read_aggregate(
|
let mut results = table
|
||||||
|
.read_aggregate(
|
||||||
Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]),
|
Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]),
|
||||||
&Selection::Some(&[]),
|
&Selection::Some(&[]),
|
||||||
&[("time", AggregateType::Count), ("time", AggregateType::Sum)],
|
&[("time", AggregateType::Count), ("time", AggregateType::Sum)],
|
||||||
);
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
DisplayReadAggregateResults(vec![results.next_merged_result().unwrap()]).to_string(),
|
DisplayReadAggregateResults(vec![results.next_merged_result().unwrap()]).to_string(),
|
||||||
"time_count,time_sum\n2,300\n",
|
"time_count,time_sum\n2,300\n",
|
||||||
);
|
);
|
||||||
assert!(matches!(results.next_merged_result(), None));
|
assert!(matches!(results.next_merged_result(), None));
|
||||||
|
|
||||||
|
// group on wrong columns.
|
||||||
|
let results = table.read_aggregate(
|
||||||
|
Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]),
|
||||||
|
&Selection::Some(&["time"]),
|
||||||
|
&[("min", AggregateType::Min)],
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(matches!(
|
||||||
|
&results,
|
||||||
|
Err(Error::UnsupportedColumnOperation { .. })
|
||||||
|
),);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
Loading…
Reference in New Issue