Merge pull request #1026 from influxdata/er/refactor/aggregate_tidy
refactor: remove some dead codepull/24376/head
commit
2eb494585f
|
@ -4,7 +4,7 @@ use std::{
|
|||
};
|
||||
|
||||
use internal_types::selection::Selection;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
|
||||
use crate::row_group::RowGroup;
|
||||
use crate::row_group::{ColumnName, Predicate};
|
||||
|
@ -184,9 +184,7 @@ impl Chunk {
|
|||
let table = chunk_data
|
||||
.data
|
||||
.get(table_name)
|
||||
.ok_or(Error::TableNotFound {
|
||||
table_name: table_name.to_owned(),
|
||||
})?;
|
||||
.context(TableNotFound { table_name })?;
|
||||
|
||||
Ok(table.read_filter(select_columns, predicate))
|
||||
}
|
||||
|
@ -195,7 +193,7 @@ impl Chunk {
|
|||
/// columns, optionally filtered by the provided predicate. Results are
|
||||
/// merged across all row groups within the returned table.
|
||||
///
|
||||
/// Returns `None` if the table no longer exists within the chunk.
|
||||
/// Returns an error if the specified table does not exist.
|
||||
///
|
||||
/// Note: `read_aggregate` currently only supports grouping on "tag"
|
||||
/// columns.
|
||||
|
@ -205,17 +203,18 @@ impl Chunk {
|
|||
predicate: Predicate,
|
||||
group_columns: &Selection<'_>,
|
||||
aggregates: &[(ColumnName<'_>, AggregateType)],
|
||||
) -> Option<table::ReadAggregateResults> {
|
||||
) -> Result<table::ReadAggregateResults> {
|
||||
// read lock on chunk.
|
||||
let chunk_data = self.chunk_data.read().unwrap();
|
||||
|
||||
// Lookup table by name and dispatch execution.
|
||||
//
|
||||
// TODO(edd): this should return an error
|
||||
chunk_data
|
||||
let table = chunk_data
|
||||
.data
|
||||
.get(table_name)
|
||||
.map(|table| table.read_aggregate(predicate, group_columns, aggregates))
|
||||
.context(TableNotFound { table_name })?;
|
||||
|
||||
table
|
||||
.read_aggregate(predicate, group_columns, aggregates)
|
||||
.context(TableError)
|
||||
}
|
||||
|
||||
//
|
||||
|
|
|
@ -1291,8 +1291,6 @@ mod test {
|
|||
use super::*;
|
||||
use arrow_deps::arrow::array::{Int64Array, StringArray};
|
||||
|
||||
use crate::value::AggregateResult;
|
||||
|
||||
#[test]
|
||||
fn row_ids_intersect() {
|
||||
let mut row_ids = RowIDs::new_bitmap();
|
||||
|
@ -2190,74 +2188,6 @@ mod test {
|
|||
assert_eq!(col.count(&[0, 2][..]), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn aggregate_result() {
|
||||
let mut res = AggregateResult::Count(0);
|
||||
res.update(Value::Null);
|
||||
assert!(matches!(res, AggregateResult::Count(0)));
|
||||
res.update(Value::String("hello"));
|
||||
assert!(matches!(res, AggregateResult::Count(1)));
|
||||
|
||||
let mut res = AggregateResult::Min(Value::Null);
|
||||
res.update(Value::String("Dance Yrself Clean"));
|
||||
assert!(matches!(
|
||||
res,
|
||||
AggregateResult::Min(Value::String("Dance Yrself Clean"))
|
||||
));
|
||||
res.update(Value::String("All My Friends"));
|
||||
assert!(matches!(
|
||||
res,
|
||||
AggregateResult::Min(Value::String("All My Friends"))
|
||||
));
|
||||
res.update(Value::String("Dance Yrself Clean"));
|
||||
assert!(matches!(
|
||||
res,
|
||||
AggregateResult::Min(Value::String("All My Friends"))
|
||||
));
|
||||
res.update(Value::Null);
|
||||
assert!(matches!(
|
||||
res,
|
||||
AggregateResult::Min(Value::String("All My Friends"))
|
||||
));
|
||||
|
||||
let mut res = AggregateResult::Max(Value::Null);
|
||||
res.update(Value::Scalar(Scalar::I64(20)));
|
||||
assert!(matches!(
|
||||
res,
|
||||
AggregateResult::Max(Value::Scalar(Scalar::I64(20)))
|
||||
));
|
||||
res.update(Value::Scalar(Scalar::I64(39)));
|
||||
assert!(matches!(
|
||||
res,
|
||||
AggregateResult::Max(Value::Scalar(Scalar::I64(39)))
|
||||
));
|
||||
res.update(Value::Scalar(Scalar::I64(20)));
|
||||
assert!(matches!(
|
||||
res,
|
||||
AggregateResult::Max(Value::Scalar(Scalar::I64(39)))
|
||||
));
|
||||
res.update(Value::Null);
|
||||
assert!(matches!(
|
||||
res,
|
||||
AggregateResult::Max(Value::Scalar(Scalar::I64(39)))
|
||||
));
|
||||
|
||||
let mut res = AggregateResult::Sum(Scalar::Null);
|
||||
res.update(Value::Null);
|
||||
assert!(matches!(res, AggregateResult::Sum(Scalar::Null)));
|
||||
res.update(Value::Scalar(Scalar::Null));
|
||||
assert!(matches!(res, AggregateResult::Sum(Scalar::Null)));
|
||||
|
||||
res.update(Value::Scalar(Scalar::I64(20)));
|
||||
assert!(matches!(res, AggregateResult::Sum(Scalar::I64(20))));
|
||||
|
||||
res.update(Value::Scalar(Scalar::I64(-5)));
|
||||
assert!(matches!(res, AggregateResult::Sum(Scalar::I64(15))));
|
||||
|
||||
res.update(Value::Scalar(Scalar::Null));
|
||||
assert!(matches!(res, AggregateResult::Sum(Scalar::I64(15))));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn has_non_null_value() {
|
||||
// Check each column type is wired up. Actual logic is tested in encoders.
|
||||
|
|
|
@ -363,11 +363,11 @@ impl Database {
|
|||
// Get all relevant row groups for this chunk's table. This
|
||||
// is cheap because it doesn't execute the read operation,
|
||||
// but just gets references to the needed to data to do so.
|
||||
if let Some(table_results) =
|
||||
chunk.read_aggregate(table_name, predicate.clone(), &group_columns, &aggregates)
|
||||
{
|
||||
chunk_table_results.push(table_results);
|
||||
}
|
||||
let table_results = chunk
|
||||
.read_aggregate(table_name, predicate.clone(), &group_columns, &aggregates)
|
||||
.context(ChunkError)?;
|
||||
|
||||
chunk_table_results.push(table_results);
|
||||
}
|
||||
|
||||
Ok(ReadAggregateResults::new(chunk_table_results))
|
||||
|
|
|
@ -15,7 +15,7 @@ use crate::column::{cmp::Operator, Column, RowIDs, RowIDsOption};
|
|||
use crate::schema;
|
||||
use crate::schema::{AggregateType, LogicalDataType, ResultSchema};
|
||||
use crate::value::{
|
||||
AggregateResult, AggregateVec, EncodedValues, OwnedValue, Scalar, Value, Values, ValuesIterator,
|
||||
AggregateVec, EncodedValues, OwnedValue, Scalar, Value, Values, ValuesIterator,
|
||||
};
|
||||
use arrow_deps::arrow::record_batch::RecordBatch;
|
||||
use arrow_deps::{
|
||||
|
@ -894,7 +894,7 @@ impl RowGroup {
|
|||
//
|
||||
// In this case the rows are already in "group key order" and the aggregates
|
||||
// can be calculated by reading the rows in order.
|
||||
fn read_group_sorted_stream(
|
||||
fn _read_group_sorted_stream(
|
||||
&self,
|
||||
_predicates: &Predicate,
|
||||
_group_column: ColumnName<'_>,
|
||||
|
@ -924,38 +924,30 @@ impl RowGroup {
|
|||
},
|
||||
};
|
||||
|
||||
// References to the columns to be used as input for producing the
|
||||
// output aggregates. Also returns the required aggregate type.
|
||||
let input_aggregate_columns = dst
|
||||
dst.aggregate_cols = dst
|
||||
.schema
|
||||
.aggregate_columns
|
||||
.iter()
|
||||
.map(|(col_type, agg_type, _)| (self.column_by_name(col_type.as_str()), *agg_type))
|
||||
.collect::<Vec<_>>();
|
||||
.map(|(col_type, agg_type, data_type)| {
|
||||
let col = self.column_by_name(col_type.as_str()); // input aggregate column
|
||||
let mut agg_vec = AggregateVec::from((agg_type, data_type));
|
||||
|
||||
let mut output_aggregate_columns = dst
|
||||
.schema
|
||||
.aggregate_columns
|
||||
.iter()
|
||||
.map(|(_, agg_type, data_type)| AggregateVec::from((agg_type, data_type)))
|
||||
// produce single aggregate for the input column subject to a
|
||||
// predicate filter.
|
||||
match agg_type {
|
||||
AggregateType::Count => {
|
||||
let value = Value::Scalar(Scalar::U64(col.count(&row_ids) as u64));
|
||||
agg_vec.push(value);
|
||||
}
|
||||
AggregateType::First => unimplemented!("First not yet implemented"),
|
||||
AggregateType::Last => unimplemented!("Last not yet implemented"),
|
||||
AggregateType::Min => agg_vec.push(col.min(&row_ids)),
|
||||
AggregateType::Max => agg_vec.push(col.max(&row_ids)),
|
||||
AggregateType::Sum => agg_vec.push(Value::Scalar(col.sum(&row_ids))),
|
||||
}
|
||||
agg_vec
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (i, (col, agg_type)) in input_aggregate_columns.iter().enumerate() {
|
||||
match agg_type {
|
||||
AggregateType::Count => {
|
||||
let value = Value::Scalar(Scalar::U64(col.count(&row_ids) as u64));
|
||||
output_aggregate_columns[i].push(value);
|
||||
}
|
||||
AggregateType::First => unimplemented!("First not yet implemented"),
|
||||
AggregateType::Last => unimplemented!("Last not yet implemented"),
|
||||
AggregateType::Min => output_aggregate_columns[i].push(col.min(&row_ids)),
|
||||
AggregateType::Max => output_aggregate_columns[i].push(col.max(&row_ids)),
|
||||
AggregateType::Sum => {
|
||||
output_aggregate_columns[i].push(Value::Scalar(col.sum(&row_ids)))
|
||||
}
|
||||
}
|
||||
}
|
||||
dst.aggregate_cols = output_aggregate_columns;
|
||||
}
|
||||
|
||||
/// Given the predicate (which may be empty), determine a set of rows
|
||||
|
@ -1154,6 +1146,7 @@ fn pack_u32_in_u128(packed_value: u128, encoded_id: u32, pos: usize) -> u128 {
|
|||
// Given a packed encoded group key, unpacks them into `n` individual `u32`
|
||||
// group keys, and stores them in `dst`. It is the caller's responsibility to
|
||||
// ensure n <= 4.
|
||||
#[cfg(test)]
|
||||
fn unpack_u128_group_key(group_key_packed: u128, n: usize, mut dst: Vec<u32>) -> Vec<u32> {
|
||||
dst.resize(n, 0);
|
||||
|
||||
|
@ -1372,31 +1365,6 @@ impl TryFrom<&DfExpr> for BinaryExpr {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub struct AggregateResults<'row_group>(Vec<AggregateResult<'row_group>>);
|
||||
|
||||
impl<'row_group> AggregateResults<'row_group> {
|
||||
fn len(&self) -> usize {
|
||||
self.0.len()
|
||||
}
|
||||
|
||||
fn merge(&mut self, other: &AggregateResults<'row_group>) {
|
||||
assert_eq!(self.0.len(), other.len());
|
||||
for (i, agg) in self.0.iter_mut().enumerate() {
|
||||
agg.merge(&other.0[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> IntoIterator for AggregateResults<'a> {
|
||||
type Item = AggregateResult<'a>;
|
||||
type IntoIter = std::vec::IntoIter<Self::Item>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.0.into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
// A representation of a column name.
|
||||
pub type ColumnName<'a> = &'a str;
|
||||
|
||||
|
@ -1545,11 +1513,6 @@ impl MetaData {
|
|||
self.columns_size += column_size;
|
||||
}
|
||||
|
||||
// Returns meta information about the column.
|
||||
fn column_meta(&self, name: ColumnName<'_>) -> &ColumnMeta {
|
||||
self.columns.get(name).unwrap()
|
||||
}
|
||||
|
||||
// Extract schema information for a set of columns.
|
||||
fn schema_for_column_names(
|
||||
&self,
|
||||
|
|
|
@ -12,7 +12,7 @@ use snafu::{ensure, Snafu};
|
|||
|
||||
use crate::row_group::{self, ColumnName, Predicate, RowGroup};
|
||||
use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema};
|
||||
use crate::value::{AggregateResult, Scalar, Value};
|
||||
use crate::value::Value;
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("cannot drop last row group in table; drop table"))]
|
||||
|
@ -94,6 +94,8 @@ impl Table {
|
|||
row_groups.data.push(Arc::new(rg));
|
||||
}
|
||||
|
||||
/// TODO(edd): wire up
|
||||
///
|
||||
/// Remove the row group at `position` from table, returning an error if the
|
||||
/// caller has attempted to drop the last row group.
|
||||
///
|
||||
|
@ -226,7 +228,7 @@ impl Table {
|
|||
predicate: Predicate,
|
||||
group_columns: &'input Selection<'_>,
|
||||
aggregates: &'input [(ColumnName<'input>, AggregateType)],
|
||||
) -> ReadAggregateResults {
|
||||
) -> Result<ReadAggregateResults> {
|
||||
let (meta, row_groups) = self.filter_row_groups(&predicate);
|
||||
|
||||
// Filter out any column names that we do not have data for.
|
||||
|
@ -239,13 +241,24 @@ impl Table {
|
|||
..ResultSchema::default()
|
||||
};
|
||||
|
||||
// Check all grouping columns are valid for grouping operation.
|
||||
for (ct, _) in &schema.group_columns {
|
||||
ensure!(
|
||||
matches!(ct, ColumnType::Tag(_)),
|
||||
UnsupportedColumnOperation {
|
||||
msg: format!("column type must be ColumnType::Tag, got {:?}", ct),
|
||||
column_name: ct.as_str().to_string(),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// return the iterator to build the results.
|
||||
ReadAggregateResults {
|
||||
Ok(ReadAggregateResults {
|
||||
schema,
|
||||
predicate,
|
||||
row_groups,
|
||||
..Default::default()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns aggregates segmented by grouping keys and windowed by time.
|
||||
|
@ -273,76 +286,15 @@ impl Table {
|
|||
_group_columns: Vec<ColumnName<'a>>,
|
||||
_aggregates: Vec<(ColumnName<'a>, AggregateType)>,
|
||||
_window: i64,
|
||||
) -> BTreeMap<Vec<String>, Vec<(ColumnName<'a>, AggregateResult<'_>)>> {
|
||||
) -> BTreeMap<Vec<String>, Vec<(ColumnName<'a>, ReadAggregateResults)>> {
|
||||
// identify segments where time range and predicates match could match
|
||||
// using segment meta data, and then execute against those segments and
|
||||
// merge results.
|
||||
todo!()
|
||||
}
|
||||
|
||||
// Perform aggregates without any grouping. Filtering on optional predicates
|
||||
// and time range is still supported.
|
||||
fn read_aggregate_no_group<'a>(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, &str)],
|
||||
aggregates: Vec<(ColumnName<'a>, AggregateType)>,
|
||||
) -> Vec<(ColumnName<'a>, AggregateResult<'_>)> {
|
||||
// The fast path where there are no predicates or a time range to apply.
|
||||
// We just want the equivalent of column statistics.
|
||||
if predicates.is_empty() {
|
||||
let mut results = Vec::with_capacity(aggregates.len());
|
||||
for (col_name, agg_type) in &aggregates {
|
||||
match agg_type {
|
||||
AggregateType::Count => {
|
||||
results.push((
|
||||
col_name,
|
||||
AggregateResult::Count(self.count(col_name, time_range)),
|
||||
));
|
||||
}
|
||||
AggregateType::First => {
|
||||
results.push((
|
||||
col_name,
|
||||
AggregateResult::First(self.first(col_name, time_range.0)),
|
||||
));
|
||||
}
|
||||
AggregateType::Last => {
|
||||
results.push((
|
||||
col_name,
|
||||
AggregateResult::Last(self.last(col_name, time_range.1)),
|
||||
));
|
||||
}
|
||||
AggregateType::Min => {
|
||||
results.push((
|
||||
col_name,
|
||||
AggregateResult::Min(self.min(col_name, time_range)),
|
||||
));
|
||||
}
|
||||
AggregateType::Max => {
|
||||
results.push((
|
||||
col_name,
|
||||
AggregateResult::Max(self.max(col_name, time_range)),
|
||||
));
|
||||
}
|
||||
AggregateType::Sum => {
|
||||
let res = match self.sum(col_name, time_range) {
|
||||
Some(x) => x,
|
||||
None => Scalar::Null,
|
||||
};
|
||||
|
||||
results.push((col_name, AggregateResult::Sum(res)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise we have predicates so for each segment we will execute a
|
||||
// generalised aggregation method and build up the result set.
|
||||
todo!();
|
||||
}
|
||||
|
||||
//
|
||||
// ---- Fast-path aggregations on single columns.
|
||||
// ---- Fast-path first/last selectors.
|
||||
//
|
||||
|
||||
// Returns the first value for the specified column across the table
|
||||
|
@ -387,44 +339,6 @@ impl Table {
|
|||
todo!();
|
||||
}
|
||||
|
||||
/// The minimum non-null value in the column for the table.
|
||||
fn min(&self, _column_name: &str, _time_range: (i64, i64)) -> Value<'_> {
|
||||
// Loop over segments, skipping any that don't satisfy the time range.
|
||||
// Any segments completely overlapped can have a candidate min taken
|
||||
// directly from their zone map. Partially overlapped segments will be
|
||||
// read using the appropriate execution API.
|
||||
//
|
||||
// Return the min of minimums.
|
||||
todo!();
|
||||
}
|
||||
|
||||
/// The maximum non-null value in the column for the table.
|
||||
fn max(&self, _column_name: &str, _time_range: (i64, i64)) -> Value<'_> {
|
||||
// Loop over segments, skipping any that don't satisfy the time range.
|
||||
// Any segments completely overlapped can have a candidate max taken
|
||||
// directly from their zone map. Partially overlapped segments will be
|
||||
// read using the appropriate execution API.
|
||||
//
|
||||
// Return the max of maximums.
|
||||
todo!();
|
||||
}
|
||||
|
||||
/// The number of non-null values in the column for the table.
|
||||
fn count(&self, _column_name: &str, _time_range: (i64, i64)) -> u64 {
|
||||
// Loop over segments, skipping any that don't satisfy the time range.
|
||||
// Execute appropriate aggregation call on each segment and aggregate
|
||||
// the results.
|
||||
todo!();
|
||||
}
|
||||
|
||||
/// The total sum of non-null values in the column for the table.
|
||||
fn sum(&self, _column_name: &str, _time_range: (i64, i64)) -> Option<Scalar> {
|
||||
// Loop over segments, skipping any that don't satisfy the time range.
|
||||
// Execute appropriate aggregation call on each segment and aggregate
|
||||
// the results.
|
||||
todo!();
|
||||
}
|
||||
|
||||
//
|
||||
// ---- Schema API queries
|
||||
//
|
||||
|
@ -500,36 +414,6 @@ impl Table {
|
|||
Ok(dst)
|
||||
}
|
||||
|
||||
/// Determines if this table could satisfy the provided predicate.
|
||||
///
|
||||
/// `false` is proof that no row within this table would match the
|
||||
/// predicate, whilst `true` indicates one or more rows *might* match the
|
||||
/// predicate.
|
||||
fn could_satisfy_predicate(&self, predicate: &Predicate) -> bool {
|
||||
// Get a snapshot of the table data under a read lock.
|
||||
let (meta, row_groups) = {
|
||||
let table_data = self.table_data.read().unwrap();
|
||||
(Arc::clone(&table_data.meta), table_data.data.to_vec())
|
||||
};
|
||||
|
||||
// if the table doesn't have a column for one of the predicate's
|
||||
// expressions then the table cannot satisfy the predicate.
|
||||
if !predicate
|
||||
.iter()
|
||||
.all(|expr| meta.columns.contains_key(expr.column()))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
// If there is a single row group in the table that could satisfy the
|
||||
// predicate then the table itself could satisfy the predicate so return
|
||||
// true. If none of the row groups could match then return false.
|
||||
let exprs = predicate.expressions();
|
||||
row_groups
|
||||
.iter()
|
||||
.any(|row_group| row_group.could_satisfy_conjunctive_binary_expressions(exprs))
|
||||
}
|
||||
|
||||
/// Determines if this table contains one or more rows that satisfy the
|
||||
/// predicate.
|
||||
pub fn satisfies_predicate(&self, predicate: &Predicate) -> bool {
|
||||
|
@ -1191,11 +1075,13 @@ mod test {
|
|||
table.add_row_group(rg);
|
||||
|
||||
// no predicate aggregate
|
||||
let mut results = table.read_aggregate(
|
||||
Predicate::default(),
|
||||
&Selection::Some(&[]),
|
||||
&[("time", AggregateType::Count), ("time", AggregateType::Sum)],
|
||||
);
|
||||
let mut results = table
|
||||
.read_aggregate(
|
||||
Predicate::default(),
|
||||
&Selection::Some(&[]),
|
||||
&[("time", AggregateType::Count), ("time", AggregateType::Sum)],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// check the column result schema
|
||||
let exp_schema = ResultSchema {
|
||||
|
@ -1222,17 +1108,31 @@ mod test {
|
|||
assert!(matches!(results.next_merged_result(), None));
|
||||
|
||||
// apply a predicate
|
||||
let mut results = table.read_aggregate(
|
||||
Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]),
|
||||
&Selection::Some(&[]),
|
||||
&[("time", AggregateType::Count), ("time", AggregateType::Sum)],
|
||||
);
|
||||
let mut results = table
|
||||
.read_aggregate(
|
||||
Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]),
|
||||
&Selection::Some(&[]),
|
||||
&[("time", AggregateType::Count), ("time", AggregateType::Sum)],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
DisplayReadAggregateResults(vec![results.next_merged_result().unwrap()]).to_string(),
|
||||
"time_count,time_sum\n2,300\n",
|
||||
);
|
||||
assert!(matches!(results.next_merged_result(), None));
|
||||
|
||||
// group on wrong columns.
|
||||
let results = table.read_aggregate(
|
||||
Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]),
|
||||
&Selection::Some(&["time"]),
|
||||
&[("min", AggregateType::Min)],
|
||||
);
|
||||
|
||||
assert!(matches!(
|
||||
&results,
|
||||
Err(Error::UnsupportedColumnOperation { .. })
|
||||
),);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::{collections::BTreeSet, convert::TryFrom, fmt::Formatter};
|
||||
use std::{convert::TryFrom, fmt::Formatter};
|
||||
use std::{mem::size_of, sync::Arc};
|
||||
|
||||
use arrow_deps::arrow;
|
||||
|
@ -945,330 +945,6 @@ impl From<(&AggregateType, &LogicalDataType)> for AggregateVec {
|
|||
}
|
||||
}
|
||||
|
||||
/// These variants hold aggregates, which are the results of applying aggregates
|
||||
/// to column data.
|
||||
#[derive(Debug, Copy, Clone, PartialEq)]
|
||||
pub enum AggregateResult<'a> {
|
||||
// Any type of column can have rows counted. NULL values do not contribute
|
||||
// to the count. If all rows are NULL then count will be `0`.
|
||||
Count(u64),
|
||||
|
||||
// Only numerical columns with scalar values can be summed. NULL values do
|
||||
// not contribute to the sum, but if all rows are NULL then the sum is
|
||||
// itself NULL (represented by `None`).
|
||||
Sum(Scalar),
|
||||
|
||||
// The minimum value in the column data.
|
||||
Min(Value<'a>),
|
||||
|
||||
// The maximum value in the column data.
|
||||
Max(Value<'a>),
|
||||
|
||||
// The first value in the column data and the corresponding timestamp.
|
||||
First(Option<(i64, Value<'a>)>),
|
||||
|
||||
// The last value in the column data and the corresponding timestamp.
|
||||
Last(Option<(i64, Value<'a>)>),
|
||||
}
|
||||
|
||||
#[allow(unused_assignments)]
|
||||
impl<'a> AggregateResult<'a> {
|
||||
pub fn update(&mut self, other: Value<'a>) {
|
||||
if other.is_null() {
|
||||
// a NULL value has no effect on aggregates
|
||||
return;
|
||||
}
|
||||
|
||||
match self {
|
||||
Self::Count(v) => {
|
||||
if !other.is_null() {
|
||||
*v += 1;
|
||||
}
|
||||
}
|
||||
Self::Min(v) => match (&v, &other) {
|
||||
(Value::Null, _) => {
|
||||
// something is always smaller than NULL
|
||||
*v = other;
|
||||
}
|
||||
(Value::String(_), Value::Null) => {} // do nothing
|
||||
(Value::String(a), Value::String(b)) => {
|
||||
if a.cmp(b) == std::cmp::Ordering::Greater {
|
||||
*v = other;
|
||||
}
|
||||
}
|
||||
(Value::String(a), Value::ByteArray(b)) => {
|
||||
if a.as_bytes().cmp(b) == std::cmp::Ordering::Greater {
|
||||
*v = other;
|
||||
}
|
||||
}
|
||||
(Value::ByteArray(_), Value::Null) => {} // do nothing
|
||||
(Value::ByteArray(a), Value::String(b)) => {
|
||||
if a.cmp(&b.as_bytes()) == std::cmp::Ordering::Greater {
|
||||
*v = other;
|
||||
}
|
||||
}
|
||||
(Value::ByteArray(a), Value::ByteArray(b)) => {
|
||||
if a.cmp(b) == std::cmp::Ordering::Greater {
|
||||
*v = other;
|
||||
}
|
||||
}
|
||||
(Value::Scalar(_), Value::Null) => {} // do nothing
|
||||
(Value::Scalar(a), Value::Scalar(b)) => {
|
||||
if a > b {
|
||||
*v = other;
|
||||
}
|
||||
}
|
||||
(_, _) => unreachable!("not a possible variant combination"),
|
||||
},
|
||||
Self::Max(v) => match (&v, &other) {
|
||||
(Value::Null, _) => {
|
||||
// something is always larger than NULL
|
||||
*v = other;
|
||||
}
|
||||
(Value::String(_), Value::Null) => {} // do nothing
|
||||
(Value::String(a), Value::String(b)) => {
|
||||
if a.cmp(b) == std::cmp::Ordering::Less {
|
||||
*v = other;
|
||||
}
|
||||
}
|
||||
(Value::String(a), Value::ByteArray(b)) => {
|
||||
if a.as_bytes().cmp(b) == std::cmp::Ordering::Less {
|
||||
*v = other;
|
||||
}
|
||||
}
|
||||
(Value::ByteArray(_), Value::Null) => {} // do nothing
|
||||
(Value::ByteArray(a), Value::String(b)) => {
|
||||
if a.cmp(&b.as_bytes()) == std::cmp::Ordering::Less {
|
||||
*v = other;
|
||||
}
|
||||
}
|
||||
(Value::ByteArray(a), Value::ByteArray(b)) => {
|
||||
if a.cmp(b) == std::cmp::Ordering::Less {
|
||||
*v = other;
|
||||
}
|
||||
}
|
||||
(Value::Scalar(_), Value::Null) => {} // do nothing
|
||||
(Value::Scalar(a), Value::Scalar(b)) => {
|
||||
if a < b {
|
||||
*v = other;
|
||||
}
|
||||
}
|
||||
(_, _) => unreachable!("not a possible variant combination"),
|
||||
},
|
||||
Self::Sum(v) => match (&v, &other) {
|
||||
(Scalar::Null, Value::Scalar(other_scalar)) => {
|
||||
// NULL + something == something
|
||||
*v = *other_scalar;
|
||||
}
|
||||
(_, Value::Scalar(b)) => *v += b,
|
||||
(_, _) => unreachable!("not a possible variant combination"),
|
||||
},
|
||||
_ => unimplemented!("First and Last aggregates not implemented yet"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Merge `other` into `self`
|
||||
pub fn merge(&mut self, other: &AggregateResult<'a>) {
|
||||
match (self, other) {
|
||||
(AggregateResult::Count(this), AggregateResult::Count(that)) => *this += *that,
|
||||
(AggregateResult::Sum(this), AggregateResult::Sum(that)) => *this += that,
|
||||
(AggregateResult::Min(this), AggregateResult::Min(that)) => {
|
||||
if *this > *that {
|
||||
*this = *that;
|
||||
}
|
||||
}
|
||||
(AggregateResult::Max(this), AggregateResult::Max(that)) => {
|
||||
if *this < *that {
|
||||
*this = *that;
|
||||
}
|
||||
}
|
||||
(a, b) => unimplemented!("merging {:?} into {:?} not yet implemented", b, a),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_as_str(&self) -> Option<&str> {
|
||||
match &self {
|
||||
AggregateResult::Min(v) => match v {
|
||||
Value::Null => None,
|
||||
Value::String(s) => Some(s),
|
||||
v => panic!("cannot convert {:?} to &str", v),
|
||||
},
|
||||
AggregateResult::Max(v) => match v {
|
||||
Value::Null => None,
|
||||
Value::String(s) => Some(s),
|
||||
v => panic!("cannot convert {:?} to &str", v),
|
||||
},
|
||||
AggregateResult::First(_) => panic!("cannot convert first tuple to &str"),
|
||||
AggregateResult::Last(_) => panic!("cannot convert last tuple to &str"),
|
||||
AggregateResult::Sum(v) => panic!("cannot convert {:?} to &str", v),
|
||||
AggregateResult::Count(_) => panic!("cannot convert count to &str"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_as_bytes(&self) -> Option<&[u8]> {
|
||||
match &self {
|
||||
AggregateResult::Min(v) => match v {
|
||||
Value::Null => None,
|
||||
Value::ByteArray(s) => Some(s),
|
||||
v => panic!("cannot convert {:?} to &[u8]", v),
|
||||
},
|
||||
AggregateResult::Max(v) => match v {
|
||||
Value::Null => None,
|
||||
Value::ByteArray(s) => Some(s),
|
||||
v => panic!("cannot convert {:?} to &[u8]", v),
|
||||
},
|
||||
AggregateResult::First(_) => panic!("cannot convert first tuple to &[u8]"),
|
||||
AggregateResult::Last(_) => panic!("cannot convert last tuple to &[u8]"),
|
||||
AggregateResult::Sum(v) => panic!("cannot convert {:?} to &[u8]", v),
|
||||
AggregateResult::Count(_) => panic!("cannot convert count to &[u8]"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_as_bool(&self) -> Option<bool> {
|
||||
match &self {
|
||||
AggregateResult::Min(v) => match v {
|
||||
Value::Null => None,
|
||||
Value::Boolean(s) => Some(*s),
|
||||
v => panic!("cannot convert {:?} to bool", v),
|
||||
},
|
||||
AggregateResult::Max(v) => match v {
|
||||
Value::Null => None,
|
||||
Value::Boolean(s) => Some(*s),
|
||||
v => panic!("cannot convert {:?} to bool", v),
|
||||
},
|
||||
AggregateResult::First(_) => panic!("cannot convert first tuple to bool"),
|
||||
AggregateResult::Last(_) => panic!("cannot convert last tuple to bool"),
|
||||
AggregateResult::Sum(v) => panic!("cannot convert {:?} to bool", v),
|
||||
AggregateResult::Count(_) => panic!("cannot convert count to bool"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_as_i64_scalar(&self) -> Option<i64> {
|
||||
match &self {
|
||||
AggregateResult::Sum(v) => match v {
|
||||
Scalar::Null => None,
|
||||
Scalar::I64(v) => Some(*v),
|
||||
v => panic!("cannot convert {:?} to i64", v),
|
||||
},
|
||||
AggregateResult::Min(v) => match v {
|
||||
Value::Null => None,
|
||||
Value::Scalar(s) => match s {
|
||||
Scalar::Null => None,
|
||||
Scalar::I64(v) => Some(*v),
|
||||
v => panic!("cannot convert {:?} to u64", v),
|
||||
},
|
||||
v => panic!("cannot convert {:?} to i64", v),
|
||||
},
|
||||
AggregateResult::Max(v) => match v {
|
||||
Value::Null => None,
|
||||
Value::Scalar(s) => match s {
|
||||
Scalar::Null => None,
|
||||
Scalar::I64(v) => Some(*v),
|
||||
v => panic!("cannot convert {:?} to u64", v),
|
||||
},
|
||||
v => panic!("cannot convert {:?} to i64", v),
|
||||
},
|
||||
AggregateResult::First(_) => panic!("cannot convert first tuple to scalar"),
|
||||
AggregateResult::Last(_) => panic!("cannot convert last tuple to scalar"),
|
||||
AggregateResult::Count(_) => panic!("cannot represent count as i64"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_as_u64_scalar(&self) -> Option<u64> {
|
||||
match &self {
|
||||
AggregateResult::Sum(v) => match v {
|
||||
Scalar::Null => None,
|
||||
Scalar::U64(v) => Some(*v),
|
||||
v => panic!("cannot convert {:?} to u64", v),
|
||||
},
|
||||
AggregateResult::Count(c) => Some(*c),
|
||||
AggregateResult::Min(v) => match v {
|
||||
Value::Null => None,
|
||||
Value::Scalar(s) => match s {
|
||||
Scalar::Null => None,
|
||||
Scalar::U64(v) => Some(*v),
|
||||
v => panic!("cannot convert {:?} to u64", v),
|
||||
},
|
||||
v => panic!("cannot convert {:?} to u64", v),
|
||||
},
|
||||
AggregateResult::Max(v) => match v {
|
||||
Value::Null => None,
|
||||
Value::Scalar(s) => match s {
|
||||
Scalar::Null => None,
|
||||
Scalar::U64(v) => Some(*v),
|
||||
v => panic!("cannot convert {:?} to u64", v),
|
||||
},
|
||||
v => panic!("cannot convert {:?} to u64", v),
|
||||
},
|
||||
AggregateResult::First(_) => panic!("cannot convert first tuple to scalar"),
|
||||
AggregateResult::Last(_) => panic!("cannot convert last tuple to scalar"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_as_f64_scalar(&self) -> Option<f64> {
|
||||
match &self {
|
||||
AggregateResult::Sum(v) => match v {
|
||||
Scalar::Null => None,
|
||||
Scalar::F64(v) => Some(*v),
|
||||
v => panic!("cannot convert {:?} to f64", v),
|
||||
},
|
||||
AggregateResult::Min(v) => match v {
|
||||
Value::Null => None,
|
||||
Value::Scalar(s) => match s {
|
||||
Scalar::Null => None,
|
||||
Scalar::F64(v) => Some(*v),
|
||||
v => panic!("cannot convert {:?} to f64", v),
|
||||
},
|
||||
v => panic!("cannot convert {:?} to f64", v),
|
||||
},
|
||||
AggregateResult::Max(v) => match v {
|
||||
Value::Null => None,
|
||||
Value::Scalar(s) => match s {
|
||||
Scalar::Null => None,
|
||||
Scalar::F64(v) => Some(*v),
|
||||
v => panic!("cannot convert {:?} to f64", v),
|
||||
},
|
||||
v => panic!("cannot convert {:?} to f64", v),
|
||||
},
|
||||
AggregateResult::First(_) => panic!("cannot convert first tuple to scalar"),
|
||||
AggregateResult::Last(_) => panic!("cannot convert last tuple to scalar"),
|
||||
AggregateResult::Count(_) => panic!("cannot represent count as f64"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&AggregateType> for AggregateResult<'_> {
|
||||
fn from(typ: &AggregateType) -> Self {
|
||||
match typ {
|
||||
AggregateType::Count => Self::Count(0),
|
||||
AggregateType::First => Self::First(None),
|
||||
AggregateType::Last => Self::Last(None),
|
||||
AggregateType::Min => Self::Min(Value::Null),
|
||||
AggregateType::Max => Self::Max(Value::Null),
|
||||
AggregateType::Sum => Self::Sum(Scalar::Null),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for AggregateResult<'_> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
AggregateResult::Count(v) => write!(f, "{}", v),
|
||||
AggregateResult::First(v) => match v {
|
||||
Some((_, v)) => write!(f, "{}", v),
|
||||
None => write!(f, "NULL"),
|
||||
},
|
||||
AggregateResult::Last(v) => match v {
|
||||
Some((_, v)) => write!(f, "{}", v),
|
||||
None => write!(f, "NULL"),
|
||||
},
|
||||
AggregateResult::Min(v) => write!(f, "{}", v),
|
||||
AggregateResult::Max(v) => write!(f, "{}", v),
|
||||
AggregateResult::Sum(v) => write!(f, "{}", v),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A scalar is a numerical value that can be aggregated.
|
||||
#[derive(Debug, PartialEq, PartialOrd, Copy, Clone)]
|
||||
pub enum Scalar {
|
||||
|
@ -1837,15 +1513,6 @@ impl<'a> Iterator for ValuesIterator<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Debug)]
|
||||
pub enum ValueSet<'a> {
|
||||
// UTF-8 valid unicode strings
|
||||
String(BTreeSet<Option<&'a String>>),
|
||||
|
||||
// Arbitrary collections of bytes
|
||||
ByteArray(BTreeSet<Option<&'a [u8]>>),
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
/// A representation of encoded values for a column.
|
||||
pub enum EncodedValues {
|
||||
|
|
Loading…
Reference in New Issue