diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 506b0d3338..e02d999e42 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -4,7 +4,7 @@ use std::{ }; use internal_types::selection::Selection; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use crate::row_group::RowGroup; use crate::row_group::{ColumnName, Predicate}; @@ -184,9 +184,7 @@ impl Chunk { let table = chunk_data .data .get(table_name) - .ok_or(Error::TableNotFound { - table_name: table_name.to_owned(), - })?; + .context(TableNotFound { table_name })?; Ok(table.read_filter(select_columns, predicate)) } @@ -195,7 +193,7 @@ impl Chunk { /// columns, optionally filtered by the provided predicate. Results are /// merged across all row groups within the returned table. /// - /// Returns `None` if the table no longer exists within the chunk. + /// Returns an error if the specified table does not exist. /// /// Note: `read_aggregate` currently only supports grouping on "tag" /// columns. @@ -205,17 +203,18 @@ impl Chunk { predicate: Predicate, group_columns: &Selection<'_>, aggregates: &[(ColumnName<'_>, AggregateType)], - ) -> Option { + ) -> Result { // read lock on chunk. let chunk_data = self.chunk_data.read().unwrap(); - // Lookup table by name and dispatch execution. - // - // TODO(edd): this should return an error - chunk_data + let table = chunk_data .data .get(table_name) - .map(|table| table.read_aggregate(predicate, group_columns, aggregates)) + .context(TableNotFound { table_name })?; + + table + .read_aggregate(predicate, group_columns, aggregates) + .context(TableError) } // diff --git a/read_buffer/src/column.rs b/read_buffer/src/column.rs index 7260754dc1..e2fddb5df0 100644 --- a/read_buffer/src/column.rs +++ b/read_buffer/src/column.rs @@ -1291,8 +1291,6 @@ mod test { use super::*; use arrow_deps::arrow::array::{Int64Array, StringArray}; - use crate::value::AggregateResult; - #[test] fn row_ids_intersect() { let mut row_ids = RowIDs::new_bitmap(); @@ -2190,74 +2188,6 @@ mod test { assert_eq!(col.count(&[0, 2][..]), 0); } - #[test] - fn aggregate_result() { - let mut res = AggregateResult::Count(0); - res.update(Value::Null); - assert!(matches!(res, AggregateResult::Count(0))); - res.update(Value::String("hello")); - assert!(matches!(res, AggregateResult::Count(1))); - - let mut res = AggregateResult::Min(Value::Null); - res.update(Value::String("Dance Yrself Clean")); - assert!(matches!( - res, - AggregateResult::Min(Value::String("Dance Yrself Clean")) - )); - res.update(Value::String("All My Friends")); - assert!(matches!( - res, - AggregateResult::Min(Value::String("All My Friends")) - )); - res.update(Value::String("Dance Yrself Clean")); - assert!(matches!( - res, - AggregateResult::Min(Value::String("All My Friends")) - )); - res.update(Value::Null); - assert!(matches!( - res, - AggregateResult::Min(Value::String("All My Friends")) - )); - - let mut res = AggregateResult::Max(Value::Null); - res.update(Value::Scalar(Scalar::I64(20))); - assert!(matches!( - res, - AggregateResult::Max(Value::Scalar(Scalar::I64(20))) - )); - res.update(Value::Scalar(Scalar::I64(39))); - assert!(matches!( - res, - AggregateResult::Max(Value::Scalar(Scalar::I64(39))) - )); - res.update(Value::Scalar(Scalar::I64(20))); - assert!(matches!( - res, - AggregateResult::Max(Value::Scalar(Scalar::I64(39))) - )); - res.update(Value::Null); - assert!(matches!( - res, - AggregateResult::Max(Value::Scalar(Scalar::I64(39))) - )); - - let mut res = AggregateResult::Sum(Scalar::Null); - res.update(Value::Null); - assert!(matches!(res, AggregateResult::Sum(Scalar::Null))); - res.update(Value::Scalar(Scalar::Null)); - assert!(matches!(res, AggregateResult::Sum(Scalar::Null))); - - res.update(Value::Scalar(Scalar::I64(20))); - assert!(matches!(res, AggregateResult::Sum(Scalar::I64(20)))); - - res.update(Value::Scalar(Scalar::I64(-5))); - assert!(matches!(res, AggregateResult::Sum(Scalar::I64(15)))); - - res.update(Value::Scalar(Scalar::Null)); - assert!(matches!(res, AggregateResult::Sum(Scalar::I64(15)))); - } - #[test] fn has_non_null_value() { // Check each column type is wired up. Actual logic is tested in encoders. diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index af21e2d92a..23d2372ee6 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -363,11 +363,11 @@ impl Database { // Get all relevant row groups for this chunk's table. This // is cheap because it doesn't execute the read operation, // but just gets references to the needed to data to do so. - if let Some(table_results) = - chunk.read_aggregate(table_name, predicate.clone(), &group_columns, &aggregates) - { - chunk_table_results.push(table_results); - } + let table_results = chunk + .read_aggregate(table_name, predicate.clone(), &group_columns, &aggregates) + .context(ChunkError)?; + + chunk_table_results.push(table_results); } Ok(ReadAggregateResults::new(chunk_table_results)) diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 6ae8ab3f26..2a4f112bb0 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -15,7 +15,7 @@ use crate::column::{cmp::Operator, Column, RowIDs, RowIDsOption}; use crate::schema; use crate::schema::{AggregateType, LogicalDataType, ResultSchema}; use crate::value::{ - AggregateResult, AggregateVec, EncodedValues, OwnedValue, Scalar, Value, Values, ValuesIterator, + AggregateVec, EncodedValues, OwnedValue, Scalar, Value, Values, ValuesIterator, }; use arrow_deps::arrow::record_batch::RecordBatch; use arrow_deps::{ @@ -894,7 +894,7 @@ impl RowGroup { // // In this case the rows are already in "group key order" and the aggregates // can be calculated by reading the rows in order. - fn read_group_sorted_stream( + fn _read_group_sorted_stream( &self, _predicates: &Predicate, _group_column: ColumnName<'_>, @@ -924,38 +924,30 @@ impl RowGroup { }, }; - // References to the columns to be used as input for producing the - // output aggregates. Also returns the required aggregate type. - let input_aggregate_columns = dst + dst.aggregate_cols = dst .schema .aggregate_columns .iter() - .map(|(col_type, agg_type, _)| (self.column_by_name(col_type.as_str()), *agg_type)) - .collect::>(); + .map(|(col_type, agg_type, data_type)| { + let col = self.column_by_name(col_type.as_str()); // input aggregate column + let mut agg_vec = AggregateVec::from((agg_type, data_type)); - let mut output_aggregate_columns = dst - .schema - .aggregate_columns - .iter() - .map(|(_, agg_type, data_type)| AggregateVec::from((agg_type, data_type))) + // produce single aggregate for the input column subject to a + // predicate filter. + match agg_type { + AggregateType::Count => { + let value = Value::Scalar(Scalar::U64(col.count(&row_ids) as u64)); + agg_vec.push(value); + } + AggregateType::First => unimplemented!("First not yet implemented"), + AggregateType::Last => unimplemented!("Last not yet implemented"), + AggregateType::Min => agg_vec.push(col.min(&row_ids)), + AggregateType::Max => agg_vec.push(col.max(&row_ids)), + AggregateType::Sum => agg_vec.push(Value::Scalar(col.sum(&row_ids))), + } + agg_vec + }) .collect::>(); - - for (i, (col, agg_type)) in input_aggregate_columns.iter().enumerate() { - match agg_type { - AggregateType::Count => { - let value = Value::Scalar(Scalar::U64(col.count(&row_ids) as u64)); - output_aggregate_columns[i].push(value); - } - AggregateType::First => unimplemented!("First not yet implemented"), - AggregateType::Last => unimplemented!("Last not yet implemented"), - AggregateType::Min => output_aggregate_columns[i].push(col.min(&row_ids)), - AggregateType::Max => output_aggregate_columns[i].push(col.max(&row_ids)), - AggregateType::Sum => { - output_aggregate_columns[i].push(Value::Scalar(col.sum(&row_ids))) - } - } - } - dst.aggregate_cols = output_aggregate_columns; } /// Given the predicate (which may be empty), determine a set of rows @@ -1154,6 +1146,7 @@ fn pack_u32_in_u128(packed_value: u128, encoded_id: u32, pos: usize) -> u128 { // Given a packed encoded group key, unpacks them into `n` individual `u32` // group keys, and stores them in `dst`. It is the caller's responsibility to // ensure n <= 4. +#[cfg(test)] fn unpack_u128_group_key(group_key_packed: u128, n: usize, mut dst: Vec) -> Vec { dst.resize(n, 0); @@ -1372,31 +1365,6 @@ impl TryFrom<&DfExpr> for BinaryExpr { } } -#[derive(Debug, PartialEq, Clone)] -pub struct AggregateResults<'row_group>(Vec>); - -impl<'row_group> AggregateResults<'row_group> { - fn len(&self) -> usize { - self.0.len() - } - - fn merge(&mut self, other: &AggregateResults<'row_group>) { - assert_eq!(self.0.len(), other.len()); - for (i, agg) in self.0.iter_mut().enumerate() { - agg.merge(&other.0[i]); - } - } -} - -impl<'a> IntoIterator for AggregateResults<'a> { - type Item = AggregateResult<'a>; - type IntoIter = std::vec::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - // A representation of a column name. pub type ColumnName<'a> = &'a str; @@ -1545,11 +1513,6 @@ impl MetaData { self.columns_size += column_size; } - // Returns meta information about the column. - fn column_meta(&self, name: ColumnName<'_>) -> &ColumnMeta { - self.columns.get(name).unwrap() - } - // Extract schema information for a set of columns. fn schema_for_column_names( &self, diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 23f630cf81..7e68a13857 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -12,7 +12,7 @@ use snafu::{ensure, Snafu}; use crate::row_group::{self, ColumnName, Predicate, RowGroup}; use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema}; -use crate::value::{AggregateResult, Scalar, Value}; +use crate::value::Value; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("cannot drop last row group in table; drop table"))] @@ -94,6 +94,8 @@ impl Table { row_groups.data.push(Arc::new(rg)); } + /// TODO(edd): wire up + /// /// Remove the row group at `position` from table, returning an error if the /// caller has attempted to drop the last row group. /// @@ -226,7 +228,7 @@ impl Table { predicate: Predicate, group_columns: &'input Selection<'_>, aggregates: &'input [(ColumnName<'input>, AggregateType)], - ) -> ReadAggregateResults { + ) -> Result { let (meta, row_groups) = self.filter_row_groups(&predicate); // Filter out any column names that we do not have data for. @@ -239,13 +241,24 @@ impl Table { ..ResultSchema::default() }; + // Check all grouping columns are valid for grouping operation. + for (ct, _) in &schema.group_columns { + ensure!( + matches!(ct, ColumnType::Tag(_)), + UnsupportedColumnOperation { + msg: format!("column type must be ColumnType::Tag, got {:?}", ct), + column_name: ct.as_str().to_string(), + }, + ) + } + // return the iterator to build the results. - ReadAggregateResults { + Ok(ReadAggregateResults { schema, predicate, row_groups, ..Default::default() - } + }) } /// Returns aggregates segmented by grouping keys and windowed by time. @@ -273,76 +286,15 @@ impl Table { _group_columns: Vec>, _aggregates: Vec<(ColumnName<'a>, AggregateType)>, _window: i64, - ) -> BTreeMap, Vec<(ColumnName<'a>, AggregateResult<'_>)>> { + ) -> BTreeMap, Vec<(ColumnName<'a>, ReadAggregateResults)>> { // identify segments where time range and predicates match could match // using segment meta data, and then execute against those segments and // merge results. todo!() } - // Perform aggregates without any grouping. Filtering on optional predicates - // and time range is still supported. - fn read_aggregate_no_group<'a>( - &self, - time_range: (i64, i64), - predicates: &[(&str, &str)], - aggregates: Vec<(ColumnName<'a>, AggregateType)>, - ) -> Vec<(ColumnName<'a>, AggregateResult<'_>)> { - // The fast path where there are no predicates or a time range to apply. - // We just want the equivalent of column statistics. - if predicates.is_empty() { - let mut results = Vec::with_capacity(aggregates.len()); - for (col_name, agg_type) in &aggregates { - match agg_type { - AggregateType::Count => { - results.push(( - col_name, - AggregateResult::Count(self.count(col_name, time_range)), - )); - } - AggregateType::First => { - results.push(( - col_name, - AggregateResult::First(self.first(col_name, time_range.0)), - )); - } - AggregateType::Last => { - results.push(( - col_name, - AggregateResult::Last(self.last(col_name, time_range.1)), - )); - } - AggregateType::Min => { - results.push(( - col_name, - AggregateResult::Min(self.min(col_name, time_range)), - )); - } - AggregateType::Max => { - results.push(( - col_name, - AggregateResult::Max(self.max(col_name, time_range)), - )); - } - AggregateType::Sum => { - let res = match self.sum(col_name, time_range) { - Some(x) => x, - None => Scalar::Null, - }; - - results.push((col_name, AggregateResult::Sum(res))); - } - } - } - } - - // Otherwise we have predicates so for each segment we will execute a - // generalised aggregation method and build up the result set. - todo!(); - } - // - // ---- Fast-path aggregations on single columns. + // ---- Fast-path first/last selectors. // // Returns the first value for the specified column across the table @@ -387,44 +339,6 @@ impl Table { todo!(); } - /// The minimum non-null value in the column for the table. - fn min(&self, _column_name: &str, _time_range: (i64, i64)) -> Value<'_> { - // Loop over segments, skipping any that don't satisfy the time range. - // Any segments completely overlapped can have a candidate min taken - // directly from their zone map. Partially overlapped segments will be - // read using the appropriate execution API. - // - // Return the min of minimums. - todo!(); - } - - /// The maximum non-null value in the column for the table. - fn max(&self, _column_name: &str, _time_range: (i64, i64)) -> Value<'_> { - // Loop over segments, skipping any that don't satisfy the time range. - // Any segments completely overlapped can have a candidate max taken - // directly from their zone map. Partially overlapped segments will be - // read using the appropriate execution API. - // - // Return the max of maximums. - todo!(); - } - - /// The number of non-null values in the column for the table. - fn count(&self, _column_name: &str, _time_range: (i64, i64)) -> u64 { - // Loop over segments, skipping any that don't satisfy the time range. - // Execute appropriate aggregation call on each segment and aggregate - // the results. - todo!(); - } - - /// The total sum of non-null values in the column for the table. - fn sum(&self, _column_name: &str, _time_range: (i64, i64)) -> Option { - // Loop over segments, skipping any that don't satisfy the time range. - // Execute appropriate aggregation call on each segment and aggregate - // the results. - todo!(); - } - // // ---- Schema API queries // @@ -500,36 +414,6 @@ impl Table { Ok(dst) } - /// Determines if this table could satisfy the provided predicate. - /// - /// `false` is proof that no row within this table would match the - /// predicate, whilst `true` indicates one or more rows *might* match the - /// predicate. - fn could_satisfy_predicate(&self, predicate: &Predicate) -> bool { - // Get a snapshot of the table data under a read lock. - let (meta, row_groups) = { - let table_data = self.table_data.read().unwrap(); - (Arc::clone(&table_data.meta), table_data.data.to_vec()) - }; - - // if the table doesn't have a column for one of the predicate's - // expressions then the table cannot satisfy the predicate. - if !predicate - .iter() - .all(|expr| meta.columns.contains_key(expr.column())) - { - return false; - } - - // If there is a single row group in the table that could satisfy the - // predicate then the table itself could satisfy the predicate so return - // true. If none of the row groups could match then return false. - let exprs = predicate.expressions(); - row_groups - .iter() - .any(|row_group| row_group.could_satisfy_conjunctive_binary_expressions(exprs)) - } - /// Determines if this table contains one or more rows that satisfy the /// predicate. pub fn satisfies_predicate(&self, predicate: &Predicate) -> bool { @@ -1191,11 +1075,13 @@ mod test { table.add_row_group(rg); // no predicate aggregate - let mut results = table.read_aggregate( - Predicate::default(), - &Selection::Some(&[]), - &[("time", AggregateType::Count), ("time", AggregateType::Sum)], - ); + let mut results = table + .read_aggregate( + Predicate::default(), + &Selection::Some(&[]), + &[("time", AggregateType::Count), ("time", AggregateType::Sum)], + ) + .unwrap(); // check the column result schema let exp_schema = ResultSchema { @@ -1222,17 +1108,31 @@ mod test { assert!(matches!(results.next_merged_result(), None)); // apply a predicate - let mut results = table.read_aggregate( - Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]), - &Selection::Some(&[]), - &[("time", AggregateType::Count), ("time", AggregateType::Sum)], - ); + let mut results = table + .read_aggregate( + Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]), + &Selection::Some(&[]), + &[("time", AggregateType::Count), ("time", AggregateType::Sum)], + ) + .unwrap(); assert_eq!( DisplayReadAggregateResults(vec![results.next_merged_result().unwrap()]).to_string(), "time_count,time_sum\n2,300\n", ); assert!(matches!(results.next_merged_result(), None)); + + // group on wrong columns. + let results = table.read_aggregate( + Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]), + &Selection::Some(&["time"]), + &[("min", AggregateType::Min)], + ); + + assert!(matches!( + &results, + Err(Error::UnsupportedColumnOperation { .. }) + ),); } #[test] diff --git a/read_buffer/src/value.rs b/read_buffer/src/value.rs index 083c5055fd..c7adf11952 100644 --- a/read_buffer/src/value.rs +++ b/read_buffer/src/value.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeSet, convert::TryFrom, fmt::Formatter}; +use std::{convert::TryFrom, fmt::Formatter}; use std::{mem::size_of, sync::Arc}; use arrow_deps::arrow; @@ -945,330 +945,6 @@ impl From<(&AggregateType, &LogicalDataType)> for AggregateVec { } } -/// These variants hold aggregates, which are the results of applying aggregates -/// to column data. -#[derive(Debug, Copy, Clone, PartialEq)] -pub enum AggregateResult<'a> { - // Any type of column can have rows counted. NULL values do not contribute - // to the count. If all rows are NULL then count will be `0`. - Count(u64), - - // Only numerical columns with scalar values can be summed. NULL values do - // not contribute to the sum, but if all rows are NULL then the sum is - // itself NULL (represented by `None`). - Sum(Scalar), - - // The minimum value in the column data. - Min(Value<'a>), - - // The maximum value in the column data. - Max(Value<'a>), - - // The first value in the column data and the corresponding timestamp. - First(Option<(i64, Value<'a>)>), - - // The last value in the column data and the corresponding timestamp. - Last(Option<(i64, Value<'a>)>), -} - -#[allow(unused_assignments)] -impl<'a> AggregateResult<'a> { - pub fn update(&mut self, other: Value<'a>) { - if other.is_null() { - // a NULL value has no effect on aggregates - return; - } - - match self { - Self::Count(v) => { - if !other.is_null() { - *v += 1; - } - } - Self::Min(v) => match (&v, &other) { - (Value::Null, _) => { - // something is always smaller than NULL - *v = other; - } - (Value::String(_), Value::Null) => {} // do nothing - (Value::String(a), Value::String(b)) => { - if a.cmp(b) == std::cmp::Ordering::Greater { - *v = other; - } - } - (Value::String(a), Value::ByteArray(b)) => { - if a.as_bytes().cmp(b) == std::cmp::Ordering::Greater { - *v = other; - } - } - (Value::ByteArray(_), Value::Null) => {} // do nothing - (Value::ByteArray(a), Value::String(b)) => { - if a.cmp(&b.as_bytes()) == std::cmp::Ordering::Greater { - *v = other; - } - } - (Value::ByteArray(a), Value::ByteArray(b)) => { - if a.cmp(b) == std::cmp::Ordering::Greater { - *v = other; - } - } - (Value::Scalar(_), Value::Null) => {} // do nothing - (Value::Scalar(a), Value::Scalar(b)) => { - if a > b { - *v = other; - } - } - (_, _) => unreachable!("not a possible variant combination"), - }, - Self::Max(v) => match (&v, &other) { - (Value::Null, _) => { - // something is always larger than NULL - *v = other; - } - (Value::String(_), Value::Null) => {} // do nothing - (Value::String(a), Value::String(b)) => { - if a.cmp(b) == std::cmp::Ordering::Less { - *v = other; - } - } - (Value::String(a), Value::ByteArray(b)) => { - if a.as_bytes().cmp(b) == std::cmp::Ordering::Less { - *v = other; - } - } - (Value::ByteArray(_), Value::Null) => {} // do nothing - (Value::ByteArray(a), Value::String(b)) => { - if a.cmp(&b.as_bytes()) == std::cmp::Ordering::Less { - *v = other; - } - } - (Value::ByteArray(a), Value::ByteArray(b)) => { - if a.cmp(b) == std::cmp::Ordering::Less { - *v = other; - } - } - (Value::Scalar(_), Value::Null) => {} // do nothing - (Value::Scalar(a), Value::Scalar(b)) => { - if a < b { - *v = other; - } - } - (_, _) => unreachable!("not a possible variant combination"), - }, - Self::Sum(v) => match (&v, &other) { - (Scalar::Null, Value::Scalar(other_scalar)) => { - // NULL + something == something - *v = *other_scalar; - } - (_, Value::Scalar(b)) => *v += b, - (_, _) => unreachable!("not a possible variant combination"), - }, - _ => unimplemented!("First and Last aggregates not implemented yet"), - } - } - - /// Merge `other` into `self` - pub fn merge(&mut self, other: &AggregateResult<'a>) { - match (self, other) { - (AggregateResult::Count(this), AggregateResult::Count(that)) => *this += *that, - (AggregateResult::Sum(this), AggregateResult::Sum(that)) => *this += that, - (AggregateResult::Min(this), AggregateResult::Min(that)) => { - if *this > *that { - *this = *that; - } - } - (AggregateResult::Max(this), AggregateResult::Max(that)) => { - if *this < *that { - *this = *that; - } - } - (a, b) => unimplemented!("merging {:?} into {:?} not yet implemented", b, a), - } - } - - pub fn try_as_str(&self) -> Option<&str> { - match &self { - AggregateResult::Min(v) => match v { - Value::Null => None, - Value::String(s) => Some(s), - v => panic!("cannot convert {:?} to &str", v), - }, - AggregateResult::Max(v) => match v { - Value::Null => None, - Value::String(s) => Some(s), - v => panic!("cannot convert {:?} to &str", v), - }, - AggregateResult::First(_) => panic!("cannot convert first tuple to &str"), - AggregateResult::Last(_) => panic!("cannot convert last tuple to &str"), - AggregateResult::Sum(v) => panic!("cannot convert {:?} to &str", v), - AggregateResult::Count(_) => panic!("cannot convert count to &str"), - } - } - - pub fn try_as_bytes(&self) -> Option<&[u8]> { - match &self { - AggregateResult::Min(v) => match v { - Value::Null => None, - Value::ByteArray(s) => Some(s), - v => panic!("cannot convert {:?} to &[u8]", v), - }, - AggregateResult::Max(v) => match v { - Value::Null => None, - Value::ByteArray(s) => Some(s), - v => panic!("cannot convert {:?} to &[u8]", v), - }, - AggregateResult::First(_) => panic!("cannot convert first tuple to &[u8]"), - AggregateResult::Last(_) => panic!("cannot convert last tuple to &[u8]"), - AggregateResult::Sum(v) => panic!("cannot convert {:?} to &[u8]", v), - AggregateResult::Count(_) => panic!("cannot convert count to &[u8]"), - } - } - - pub fn try_as_bool(&self) -> Option { - match &self { - AggregateResult::Min(v) => match v { - Value::Null => None, - Value::Boolean(s) => Some(*s), - v => panic!("cannot convert {:?} to bool", v), - }, - AggregateResult::Max(v) => match v { - Value::Null => None, - Value::Boolean(s) => Some(*s), - v => panic!("cannot convert {:?} to bool", v), - }, - AggregateResult::First(_) => panic!("cannot convert first tuple to bool"), - AggregateResult::Last(_) => panic!("cannot convert last tuple to bool"), - AggregateResult::Sum(v) => panic!("cannot convert {:?} to bool", v), - AggregateResult::Count(_) => panic!("cannot convert count to bool"), - } - } - - pub fn try_as_i64_scalar(&self) -> Option { - match &self { - AggregateResult::Sum(v) => match v { - Scalar::Null => None, - Scalar::I64(v) => Some(*v), - v => panic!("cannot convert {:?} to i64", v), - }, - AggregateResult::Min(v) => match v { - Value::Null => None, - Value::Scalar(s) => match s { - Scalar::Null => None, - Scalar::I64(v) => Some(*v), - v => panic!("cannot convert {:?} to u64", v), - }, - v => panic!("cannot convert {:?} to i64", v), - }, - AggregateResult::Max(v) => match v { - Value::Null => None, - Value::Scalar(s) => match s { - Scalar::Null => None, - Scalar::I64(v) => Some(*v), - v => panic!("cannot convert {:?} to u64", v), - }, - v => panic!("cannot convert {:?} to i64", v), - }, - AggregateResult::First(_) => panic!("cannot convert first tuple to scalar"), - AggregateResult::Last(_) => panic!("cannot convert last tuple to scalar"), - AggregateResult::Count(_) => panic!("cannot represent count as i64"), - } - } - - pub fn try_as_u64_scalar(&self) -> Option { - match &self { - AggregateResult::Sum(v) => match v { - Scalar::Null => None, - Scalar::U64(v) => Some(*v), - v => panic!("cannot convert {:?} to u64", v), - }, - AggregateResult::Count(c) => Some(*c), - AggregateResult::Min(v) => match v { - Value::Null => None, - Value::Scalar(s) => match s { - Scalar::Null => None, - Scalar::U64(v) => Some(*v), - v => panic!("cannot convert {:?} to u64", v), - }, - v => panic!("cannot convert {:?} to u64", v), - }, - AggregateResult::Max(v) => match v { - Value::Null => None, - Value::Scalar(s) => match s { - Scalar::Null => None, - Scalar::U64(v) => Some(*v), - v => panic!("cannot convert {:?} to u64", v), - }, - v => panic!("cannot convert {:?} to u64", v), - }, - AggregateResult::First(_) => panic!("cannot convert first tuple to scalar"), - AggregateResult::Last(_) => panic!("cannot convert last tuple to scalar"), - } - } - - pub fn try_as_f64_scalar(&self) -> Option { - match &self { - AggregateResult::Sum(v) => match v { - Scalar::Null => None, - Scalar::F64(v) => Some(*v), - v => panic!("cannot convert {:?} to f64", v), - }, - AggregateResult::Min(v) => match v { - Value::Null => None, - Value::Scalar(s) => match s { - Scalar::Null => None, - Scalar::F64(v) => Some(*v), - v => panic!("cannot convert {:?} to f64", v), - }, - v => panic!("cannot convert {:?} to f64", v), - }, - AggregateResult::Max(v) => match v { - Value::Null => None, - Value::Scalar(s) => match s { - Scalar::Null => None, - Scalar::F64(v) => Some(*v), - v => panic!("cannot convert {:?} to f64", v), - }, - v => panic!("cannot convert {:?} to f64", v), - }, - AggregateResult::First(_) => panic!("cannot convert first tuple to scalar"), - AggregateResult::Last(_) => panic!("cannot convert last tuple to scalar"), - AggregateResult::Count(_) => panic!("cannot represent count as f64"), - } - } -} - -impl From<&AggregateType> for AggregateResult<'_> { - fn from(typ: &AggregateType) -> Self { - match typ { - AggregateType::Count => Self::Count(0), - AggregateType::First => Self::First(None), - AggregateType::Last => Self::Last(None), - AggregateType::Min => Self::Min(Value::Null), - AggregateType::Max => Self::Max(Value::Null), - AggregateType::Sum => Self::Sum(Scalar::Null), - } - } -} - -impl std::fmt::Display for AggregateResult<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - AggregateResult::Count(v) => write!(f, "{}", v), - AggregateResult::First(v) => match v { - Some((_, v)) => write!(f, "{}", v), - None => write!(f, "NULL"), - }, - AggregateResult::Last(v) => match v { - Some((_, v)) => write!(f, "{}", v), - None => write!(f, "NULL"), - }, - AggregateResult::Min(v) => write!(f, "{}", v), - AggregateResult::Max(v) => write!(f, "{}", v), - AggregateResult::Sum(v) => write!(f, "{}", v), - } - } -} - /// A scalar is a numerical value that can be aggregated. #[derive(Debug, PartialEq, PartialOrd, Copy, Clone)] pub enum Scalar { @@ -1837,15 +1513,6 @@ impl<'a> Iterator for ValuesIterator<'a> { } } -#[derive(PartialEq, Debug)] -pub enum ValueSet<'a> { - // UTF-8 valid unicode strings - String(BTreeSet>), - - // Arbitrary collections of bytes - ByteArray(BTreeSet>), -} - #[derive(Debug, PartialEq)] /// A representation of encoded values for a column. pub enum EncodedValues {