diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 506b0d3338..e02d999e42 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -4,7 +4,7 @@ use std::{ }; use internal_types::selection::Selection; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use crate::row_group::RowGroup; use crate::row_group::{ColumnName, Predicate}; @@ -184,9 +184,7 @@ impl Chunk { let table = chunk_data .data .get(table_name) - .ok_or(Error::TableNotFound { - table_name: table_name.to_owned(), - })?; + .context(TableNotFound { table_name })?; Ok(table.read_filter(select_columns, predicate)) } @@ -195,7 +193,7 @@ impl Chunk { /// columns, optionally filtered by the provided predicate. Results are /// merged across all row groups within the returned table. /// - /// Returns `None` if the table no longer exists within the chunk. + /// Returns an error if the specified table does not exist. /// /// Note: `read_aggregate` currently only supports grouping on "tag" /// columns. @@ -205,17 +203,18 @@ impl Chunk { predicate: Predicate, group_columns: &Selection<'_>, aggregates: &[(ColumnName<'_>, AggregateType)], - ) -> Option { + ) -> Result { // read lock on chunk. let chunk_data = self.chunk_data.read().unwrap(); - // Lookup table by name and dispatch execution. - // - // TODO(edd): this should return an error - chunk_data + let table = chunk_data .data .get(table_name) - .map(|table| table.read_aggregate(predicate, group_columns, aggregates)) + .context(TableNotFound { table_name })?; + + table + .read_aggregate(predicate, group_columns, aggregates) + .context(TableError) } // diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index af21e2d92a..23d2372ee6 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -363,11 +363,11 @@ impl Database { // Get all relevant row groups for this chunk's table. This // is cheap because it doesn't execute the read operation, // but just gets references to the needed to data to do so. - if let Some(table_results) = - chunk.read_aggregate(table_name, predicate.clone(), &group_columns, &aggregates) - { - chunk_table_results.push(table_results); - } + let table_results = chunk + .read_aggregate(table_name, predicate.clone(), &group_columns, &aggregates) + .context(ChunkError)?; + + chunk_table_results.push(table_results); } Ok(ReadAggregateResults::new(chunk_table_results)) diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 3ebf687e80..7e68a13857 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -228,7 +228,7 @@ impl Table { predicate: Predicate, group_columns: &'input Selection<'_>, aggregates: &'input [(ColumnName<'input>, AggregateType)], - ) -> ReadAggregateResults { + ) -> Result { let (meta, row_groups) = self.filter_row_groups(&predicate); // Filter out any column names that we do not have data for. @@ -241,13 +241,24 @@ impl Table { ..ResultSchema::default() }; + // Check all grouping columns are valid for grouping operation. + for (ct, _) in &schema.group_columns { + ensure!( + matches!(ct, ColumnType::Tag(_)), + UnsupportedColumnOperation { + msg: format!("column type must be ColumnType::Tag, got {:?}", ct), + column_name: ct.as_str().to_string(), + }, + ) + } + // return the iterator to build the results. - ReadAggregateResults { + Ok(ReadAggregateResults { schema, predicate, row_groups, ..Default::default() - } + }) } /// Returns aggregates segmented by grouping keys and windowed by time. @@ -1064,11 +1075,13 @@ mod test { table.add_row_group(rg); // no predicate aggregate - let mut results = table.read_aggregate( - Predicate::default(), - &Selection::Some(&[]), - &[("time", AggregateType::Count), ("time", AggregateType::Sum)], - ); + let mut results = table + .read_aggregate( + Predicate::default(), + &Selection::Some(&[]), + &[("time", AggregateType::Count), ("time", AggregateType::Sum)], + ) + .unwrap(); // check the column result schema let exp_schema = ResultSchema { @@ -1095,17 +1108,31 @@ mod test { assert!(matches!(results.next_merged_result(), None)); // apply a predicate - let mut results = table.read_aggregate( - Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]), - &Selection::Some(&[]), - &[("time", AggregateType::Count), ("time", AggregateType::Sum)], - ); + let mut results = table + .read_aggregate( + Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]), + &Selection::Some(&[]), + &[("time", AggregateType::Count), ("time", AggregateType::Sum)], + ) + .unwrap(); assert_eq!( DisplayReadAggregateResults(vec![results.next_merged_result().unwrap()]).to_string(), "time_count,time_sum\n2,300\n", ); assert!(matches!(results.next_merged_result(), None)); + + // group on wrong columns. + let results = table.read_aggregate( + Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]), + &Selection::Some(&["time"]), + &[("min", AggregateType::Min)], + ); + + assert!(matches!( + &results, + Err(Error::UnsupportedColumnOperation { .. }) + ),); } #[test]