diff --git a/read_buffer/src/column.rs b/read_buffer/src/column.rs index 8bf66bddc1..7260754dc1 100644 --- a/read_buffer/src/column.rs +++ b/read_buffer/src/column.rs @@ -330,7 +330,7 @@ impl Column { // Check the column for all rows that satisfy the predicate. let row_ids = match &self { - Self::String(_, data) => data.row_ids_filter(op, value.string(), dst), + Self::String(_, data) => data.row_ids_filter(op, value.str(), dst), Self::Float(_, data) => data.row_ids_filter(op, value.scalar(), dst), Self::Integer(_, data) => data.row_ids_filter(op, value.scalar(), dst), Self::Unsigned(_, data) => data.row_ids_filter(op, value.scalar(), dst), diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index e2e9e6f6cd..db4bdf7b11 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -514,11 +514,11 @@ impl RowGroup { // single 128-bit integer as the group key. If grouping is on more than // four columns then a fallback to using an vector as a key will happen. if dst.schema.group_columns.len() <= 4 { - self.read_group_hash_with_u128_key(dst, &groupby_encoded_ids, &aggregate_columns_data); + self.read_group_hash_with_u128_key(dst, &groupby_encoded_ids, aggregate_columns_data); return; } - self.read_group_hash_with_vec_key(dst, &groupby_encoded_ids, &aggregate_columns_data); + self.read_group_hash_with_vec_key(dst, &groupby_encoded_ids, aggregate_columns_data); } // This function is used with `read_group_hash` when the number of columns @@ -528,7 +528,7 @@ impl RowGroup { &'a self, dst: &mut ReadAggregateResult<'a>, groupby_encoded_ids: &[Vec], - aggregate_input_columns: &[Values<'a>], + aggregate_input_columns: Vec>, ) { let total_rows = groupby_encoded_ids[0].len(); assert!(groupby_encoded_ids.iter().all(|x| x.len() == total_rows)); @@ -637,7 +637,7 @@ impl RowGroup { &'a self, dst: &mut ReadAggregateResult<'a>, groupby_encoded_ids: &[Vec], - aggregate_input_columns: &[Values<'a>], + aggregate_input_columns: Vec>, ) { let total_rows = groupby_encoded_ids[0].len(); assert!(groupby_encoded_ids.iter().all(|x| x.len() == total_rows)); @@ -782,6 +782,8 @@ impl RowGroup { .map(|(_, agg_type, data_type)| AggregateVec::from((agg_type, data_type, 0))) .collect::>(); + let mut output_rows = 0; + // multi_cartesian_product will create the cartesian product of all // grouping-column values. This is likely going to be more group keys // than there exists row-data for, so don't materialise them yet... @@ -837,6 +839,7 @@ impl RowGroup { // There exist rows for this group key combination. Materialise the // group key and calculate the aggregates for this key using set // of row IDs. + output_rows += 1; // Add decoded group key values to the output group columns. for (group_col_i, col) in group_cols_out.iter_mut().enumerate() { @@ -856,20 +859,33 @@ impl RowGroup { match typ { AggregateType::Count => { let agg = agg_col.count(&group_key_row_ids.to_vec()) as u64; - agg_cols_out[agg_col_i].push(&Value::Scalar(Scalar::U64(agg))) + agg_cols_out[agg_col_i].push(Value::Scalar(Scalar::U64(agg))) } AggregateType::First => {} AggregateType::Last => {} - AggregateType::Min => {} - AggregateType::Max => {} + AggregateType::Min => { + let agg = agg_col.min(&group_key_row_ids.to_vec()); + agg_cols_out[agg_col_i].push(agg); + } + AggregateType::Max => { + let agg = agg_col.max(&group_key_row_ids.to_vec()); + agg_cols_out[agg_col_i].push(agg); + } AggregateType::Sum => { let agg = agg_col.sum(&group_key_row_ids.to_vec()); - agg_cols_out[agg_col_i].push(&Value::Scalar(agg)); + agg_cols_out[agg_col_i].push(Value::Scalar(agg)); } } } } + for col in &group_cols_out { + assert_eq!(col.len(), output_rows); + } + for col in &agg_cols_out { + assert_eq!(col.len(), output_rows); + } + dst.group_key_cols = group_cols_out; dst.aggregate_cols = agg_cols_out; } @@ -890,13 +906,6 @@ impl RowGroup { // Applies aggregates on multiple columns with an optional predicate. fn aggregate_columns<'a>(&'a self, predicate: &Predicate, dst: &mut ReadAggregateResult<'a>) { - let aggregate_columns = dst - .schema - .aggregate_columns - .iter() - .map(|(col_type, agg_type, _)| (self.column_by_name(col_type.as_str()), *agg_type)) - .collect::>(); - let row_ids = match predicate.is_empty() { true => { // TODO(edd): PERF - teach each column encoding how to produce @@ -916,27 +925,38 @@ impl RowGroup { }, }; - // the single row that will store the aggregate column values. - let mut aggregate_row = vec![]; - for (col, agg_type) in aggregate_columns { + // References to the columns to be used as input for producing the + // output aggregates. Also returns the required aggregate type. + let input_aggregate_columns = dst + .schema + .aggregate_columns + .iter() + .map(|(col_type, agg_type, _)| (self.column_by_name(col_type.as_str()), *agg_type)) + .collect::>(); + + let mut output_aggregate_columns = dst + .schema + .aggregate_columns + .iter() + .map(|(_, agg_type, data_type)| AggregateVec::from((agg_type, data_type, 0))) + .collect::>(); + + for (i, (col, agg_type)) in input_aggregate_columns.iter().enumerate() { match agg_type { AggregateType::Count => { - aggregate_row.push(AggregateResult::Count(col.count(&row_ids) as u64)); - } - AggregateType::Sum => { - aggregate_row.push(AggregateResult::Sum(col.sum(&row_ids))); - } - AggregateType::Min => { - aggregate_row.push(AggregateResult::Min(col.min(&row_ids))); - } - AggregateType::Max => { - aggregate_row.push(AggregateResult::Max(col.max(&row_ids))); + let value = Value::Scalar(Scalar::U64(col.count(&row_ids) as u64)); + output_aggregate_columns[i].push(value); } AggregateType::First => unimplemented!("First not yet implemented"), AggregateType::Last => unimplemented!("Last not yet implemented"), + AggregateType::Min => output_aggregate_columns[i].push(col.min(&row_ids)), + AggregateType::Max => output_aggregate_columns[i].push(col.max(&row_ids)), + AggregateType::Sum => { + output_aggregate_columns[i].push(Value::Scalar(col.sum(&row_ids))) + } } } - dst.aggregates.push(AggregateResults(aggregate_row)); // write the row + dst.aggregate_cols = output_aggregate_columns; } /// Given the predicate (which may be empty), determine a set of rows @@ -1353,58 +1373,6 @@ impl TryFrom<&DfExpr> for BinaryExpr { } } -// A GroupKey is an ordered collection of row values. The order determines which -// columns the values originated from. -#[derive(PartialEq, PartialOrd, Clone)] -pub struct GroupKey<'row_group>(Vec>); - -impl GroupKey<'_> { - fn len(&self) -> usize { - self.0.len() - } -} - -impl<'a> From>> for GroupKey<'a> { - fn from(values: Vec>) -> Self { - Self(values) - } -} - -impl Eq for GroupKey<'_> {} - -// Implementing the `Ord` trait on `GroupKey` means that collections of group -// keys become sortable. This is typically useful for test because depending on -// the implementation, group keys are not always emitted in sorted order. -// -// To be compared, group keys *must* have the same length, or `cmp` will panic. -// They will be ordered as follows: -// -// [foo, zoo, zoo], [foo, bar, zoo], [bar, bar, bar], [bar, bar, zoo], -// -// becomes: -// -// [bar, bar, bar], [bar, bar, zoo], [foo, bar, zoo], [foo, zoo, zoo], -// -// Be careful sorting group keys in result sets, because other columns -// associated with the group keys won't be sorted unless the correct `sort` -// methods are used on the result set implementations. -impl Ord for GroupKey<'_> { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // two group keys must have same length - assert_eq!(self.0.len(), other.0.len()); - - let cols = self.0.len(); - for i in 0..cols { - match self.0[i].partial_cmp(&other.0[i]) { - Some(ord) => return ord, - None => continue, - } - } - - std::cmp::Ordering::Equal - } -} - #[derive(Debug, PartialEq, Clone)] pub struct AggregateResults<'row_group>(Vec>); @@ -1712,61 +1680,68 @@ pub struct ReadAggregateResult<'row_group> { // a schema describing the columns in the results and their types. pub(crate) schema: ResultSchema, - // row-wise collection of group keys. Each group key contains column-wise - // values for each of the groupby_columns. - pub(crate) group_keys: Vec>, - - // row-wise collection of aggregates. Each aggregate contains column-wise - // values for each of the aggregate_columns. - pub(crate) aggregates: Vec>, - + // The collection of columns forming the group keys. pub(crate) group_key_cols: Vec>>, - pub(crate) aggregate_cols: Vec>, + + // The collection of aggregate columns. Each value in each column is an + // aggregate associated with the group key built from values in the group + // columns and the same ordinal position. + pub(crate) aggregate_cols: Vec, pub(crate) group_keys_sorted: bool, } impl<'row_group> ReadAggregateResult<'row_group> { - fn with_capacity(schema: ResultSchema, capacity: usize) -> Self { + pub fn new(schema: ResultSchema) -> Self { Self { schema, - group_keys: Vec::with_capacity(capacity), - aggregates: Vec::with_capacity(capacity), ..Default::default() } } + /// A `ReadAggregateResult` is empty if there are no aggregate columns. pub fn is_empty(&self) -> bool { - self.aggregates.is_empty() + self.aggregate_cols.is_empty() } pub fn schema(&self) -> &ResultSchema { &self.schema } - // The number of rows in the result. + //9/ The number of rows in the result. pub fn rows(&self) -> usize { - self.aggregates.len() + if self.aggregate_cols.is_empty() { + return 0; + } + self.aggregate_cols[0].len() } - // The number of distinct group keys in the result. + // The number of distinct group keys in the result. Not the same as `rows()` + // because a `ReadAggregateResult` can have no group keys and have a single + // aggregate row. pub fn cardinality(&self) -> usize { - self.group_keys.len() + if self.group_key_cols.is_empty() { + return 0; + } + self.group_key_cols[0].len() } // Is this result for a grouped aggregate? pub fn is_grouped_aggregate(&self) -> bool { - !self.group_keys.is_empty() + !self.group_key_cols.is_empty() + } + + // The number of grouping columns. + pub fn group_key_columns(&self) -> usize { + self.group_key_cols.len() } // Whether or not the rows in the results are sorted by group keys or not. pub fn group_keys_sorted(&self) -> bool { - self.group_keys.is_empty() || self.group_keys_sorted + self.group_key_cols.is_empty() || self.group_keys_sorted } /// Merges `other` and self, returning a new set of results. - /// - /// NOTE: This is slow! Not expected to be the final type of implementation pub fn merge( mut self, mut other: ReadAggregateResult<'row_group>, @@ -1790,194 +1765,355 @@ impl<'row_group> ReadAggregateResult<'row_group> { other.sort(); } - let self_group_keys = self.cardinality(); - let self_len = self.rows(); - let other_len = other.rows(); - let mut result = Self::with_capacity(self.schema, self_len.max(other_len)); + let mut result = Self::new(self.schema.clone()); + // Allocate output grouping columns + result + .group_key_cols + .resize(result.schema.group_columns.len(), vec![]); + + // Allocate output aggregate columns + result.aggregate_cols = result + .schema + .aggregate_columns + .iter() + .map(|(_, agg_type, data_type)| AggregateVec::from((agg_type, data_type, 0))) + .collect::>(); + + let mut self_i = 0; + let mut other_i = 0; + while self_i < self.rows() || other_i < other.rows() { + if self_i == self.rows() { + // drained self, add the rest of other's group key columns + for (col_i, col) in result.group_key_cols.iter_mut().enumerate() { + col.extend(other.group_key_cols[col_i].iter().skip(other_i)); + } + + // add the rest of other's aggregate columns + // + // N.B - by checking the data type of the aggregate columns here + // we can do type checking on a column basis (once) rather than + // for each row. This allows us to extract an aggregate vec + // and an iterator of the same type to extend the aggregate vec. + for (col_i, (_, _, data_type)) in result.schema.aggregate_columns.iter().enumerate() + { + match data_type { + LogicalDataType::Integer => { + let itr = other.aggregate_cols[col_i].as_i64().iter().cloned(); + result.aggregate_cols[col_i].extend_with_i64(itr); + } + LogicalDataType::Unsigned => { + let itr = other.aggregate_cols[col_i].as_u64().iter().cloned(); + result.aggregate_cols[col_i].extend_with_u64(itr); + } + LogicalDataType::Float => { + let itr = other.aggregate_cols[col_i].as_f64().iter().cloned(); + result.aggregate_cols[col_i].extend_with_f64(itr); + } + LogicalDataType::String => { + let itr = other.aggregate_cols[col_i].as_str().iter().cloned(); + result.aggregate_cols[col_i].extend_with_str(itr); + } + LogicalDataType::Binary => { + let itr = other.aggregate_cols[col_i].as_bytes().iter().cloned(); + result.aggregate_cols[col_i].extend_with_bytes(itr); + } + LogicalDataType::Boolean => { + let itr = other.aggregate_cols[col_i].as_bool().iter().cloned(); + result.aggregate_cols[col_i].extend_with_bool(itr); + } + } + } - let mut i: usize = 0; - let mut j: usize = 0; - while i < self_len || j < other_len { - if i >= self_len { - // drained self, add the rest of other - result - .group_keys - .extend(other.group_keys.iter().skip(j).cloned()); - result - .aggregates - .extend(other.aggregates.iter().skip(j).cloned()); return result; - } else if j >= other_len { - // drained other, add the rest of self - result - .group_keys - .extend(self.group_keys.iter().skip(j).cloned()); - result - .aggregates - .extend(self.aggregates.iter().skip(j).cloned()); + } else if other_i == other.rows() { + // drained other, add the rest of self's group key columns + for (col_i, col) in result.group_key_cols.iter_mut().enumerate() { + col.extend(self.group_key_cols[col_i].iter().skip(self_i)); + } + + // add the rest of self's aggregate columns + for (col_i, (_, _, data_type)) in result.schema.aggregate_columns.iter().enumerate() + { + match data_type { + LogicalDataType::Integer => { + let itr = self.aggregate_cols[col_i].as_i64().iter().cloned(); + result.aggregate_cols[col_i].extend_with_i64(itr); + } + LogicalDataType::Unsigned => { + let itr = self.aggregate_cols[col_i].as_u64().iter().cloned(); + result.aggregate_cols[col_i].extend_with_u64(itr); + } + LogicalDataType::Float => { + let itr = self.aggregate_cols[col_i].as_f64().iter().cloned(); + result.aggregate_cols[col_i].extend_with_f64(itr); + } + LogicalDataType::String => { + let itr = self.aggregate_cols[col_i].as_str().iter().cloned(); + result.aggregate_cols[col_i].extend_with_str(itr); + } + LogicalDataType::Binary => { + let itr = self.aggregate_cols[col_i].as_bytes().iter().cloned(); + result.aggregate_cols[col_i].extend_with_bytes(itr); + } + LogicalDataType::Boolean => { + let itr = self.aggregate_cols[col_i].as_bool().iter().cloned(); + result.aggregate_cols[col_i].extend_with_bool(itr); + } + } + } + return result; } - // just merge the aggregate if there are no group keys - if self_group_keys == 0 { - assert!((self_len == other_len) && self_len == 1); // there should be a single aggregate row - self.aggregates[i].merge(&other.aggregates[j]); - result.aggregates.push(self.aggregates[i].clone()); - return result; + // compare the next row in self and other and determine if there is + // a clear lexicographic order. + let mut ord = Ordering::Equal; + for i in 0..result.schema.group_columns.len() { + match self.group_key_cols[i][self_i].partial_cmp(&other.group_key_cols[i][other_i]) + { + Some(o) => { + ord = o; + if !matches!(ord, Ordering::Equal) { + break; + } + } + None => continue, + } } - // there are group keys so merge them. - match self.group_keys[i].cmp(&other.group_keys[j]) { + match ord { Ordering::Less => { - result.group_keys.push(self.group_keys[i].clone()); - result.aggregates.push(self.aggregates[i].clone()); - i += 1; + // move the next row for each of self's columns onto result. + for (col_i, col) in result.group_key_cols.iter_mut().enumerate() { + col.push(self.group_key_cols[col_i][self_i]); + } + for (col_i, col) in result.aggregate_cols.iter_mut().enumerate() { + col.push(self.aggregate_cols[col_i].value(self_i)); + } + self_i += 1; } Ordering::Equal => { - // merge aggregates - self.aggregates[i].merge(&other.aggregates[j]); - result.group_keys.push(self.group_keys[i].clone()); - result.aggregates.push(self.aggregates[i].clone()); - i += 1; - j += 1; + // move the next row for each of self's columns onto result. + for (col_i, col) in result.group_key_cols.iter_mut().enumerate() { + col.push(self.group_key_cols[col_i][self_i]); + } + + // merge all the aggregates for this group key. + for (col_i, col) in result.aggregate_cols.iter_mut().enumerate() { + let self_value = self.aggregate_cols[col_i].value(self_i); + let other_value = other.aggregate_cols[col_i].value(other_i); + let (_, agg_type, _) = &self.schema.aggregate_columns[col_i]; + col.push(match agg_type { + AggregateType::Count => self_value + other_value, + AggregateType::Min => match self_value.partial_cmp(&other_value) { + Some(ord) => match ord { + Ordering::Less => self_value, + Ordering::Equal => self_value, + Ordering::Greater => other_value, + }, + None => self_value, + }, + AggregateType::Max => match self_value.partial_cmp(&other_value) { + Some(ord) => match ord { + Ordering::Less => other_value, + Ordering::Equal => other_value, + Ordering::Greater => self_value, + }, + None => self_value, + }, + AggregateType::Sum => self_value + other_value, + _ => unimplemented!("first/last not implemented"), + }); + } + self_i += 1; + other_i += 1; } Ordering::Greater => { - result.group_keys.push(other.group_keys[j].clone()); - result.aggregates.push(other.aggregates[j].clone()); - j += 1; + // move the next row for each of other's columns onto result. + for (col_i, col) in result.group_key_cols.iter_mut().enumerate() { + col.push(other.group_key_cols[col_i][other_i]); + } + for (col_i, col) in result.aggregate_cols.iter_mut().enumerate() { + col.push(other.aggregate_cols[col_i].value(other_i)); + } + other_i += 1; } } } - result } - /// Executes a mutable sort of the rows in the result set based on the - /// lexicographic order of each group key column. - /// - /// TODO(edd): this has really poor performance. It clones the underlying - /// vectors rather than sorting them in place. - pub fn sort(&mut self) { - // The permutation crate lets you execute a sort on anything implements - // `Ord` and return the sort order, which can then be applied to other - // columns. - let perm = permutation::sort(self.group_keys.as_slice()); - self.group_keys = perm.apply_slice(self.group_keys.as_slice()); - self.aggregates = perm.apply_slice(self.aggregates.as_slice()); + // Executes a mutable sort of the results based on the lexicographic order + // of each group key columns. + // + // Given these group key columns: + // + // [foo [zoo [zoo + // foo bar zoo + // bar bar bar + // bar] bar] zoo] + // + // `sort` would result them becoming: + // + // [bar [bar [bar + // bar bar zoo + // foo bar zoo + // foo] zoo] zoo] + // + // The same permutation is also applied to the aggregate columns. + // + fn sort(&mut self) { + if self.group_keys_sorted { + return; + } + + // Create a vector of group keys, which allows us to determine a + // permutation by which we should sort all columns. + let mut group_keys = (0..self.rows()) + .map(|i| GroupKey::new(&self.group_key_cols, i)) + .collect::>(); + + // sort the vector of group keys, which will give us a permutation + // that we can apply to all of the columns. + group_keys.sort_unstable_by(|a, b| { + let cols = a.len(); + for i in 0..cols { + match a.columns[i][a.row_offset].partial_cmp(&b.columns[i][b.row_offset]) { + Some(ord) => { + if matches!(ord, Ordering::Equal) { + continue; + } + return ord; + } + None => continue, + } + } + + std::cmp::Ordering::Equal + }); + + // Now create a permutation by looking at how the row_offsets have been + // ordered in the `group_keys` array. + let perm = permutation::Permutation::from_vec( + group_keys + .iter() + .map(|gk| gk.row_offset) + .collect::>(), + ); + assert_eq!(perm.len(), self.rows()); + + // Apply that permutation to all of the columns. + for col in self.group_key_cols.iter_mut() { + *col = perm.apply_slice(col.as_slice()); + } + + for col in self.aggregate_cols.iter_mut() { + col.sort_with_permutation(&perm); + } + self.group_keys_sorted = true; } +} - pub fn add_row( - &mut self, - group_key: Vec>, - aggregates: Vec>, - ) { - self.group_keys.push(GroupKey(group_key)); - self.aggregates.push(AggregateResults(aggregates)); +// The `groupKey` struct is a wrapper over a specific row of data in grouping +// columns. +// +// Rather than pivot the columns into a row-wise orientation to sort them, we +// can effectively sort a projection across them (`row_offset`) storing +// `groupKey`s in a vector and sorting that. +struct GroupKey<'a> { + columns: &'a [Vec>], + row_offset: usize, +} + +impl<'a> GroupKey<'a> { + fn new(columns: &'a [Vec>], offset: usize) -> Self { + Self { + columns, + row_offset: offset, + } + } + + // The number of columns comprising the `GroupKey`. + fn len(&self) -> usize { + self.columns.len() } } impl TryFrom> for RecordBatch { type Error = Error; - fn try_from(result: ReadAggregateResult<'_>) -> Result { + fn try_from(mut result: ReadAggregateResult<'_>) -> Result { let schema = internal_types::schema::Schema::try_from(result.schema()) .map_err(|source| Error::SchemaError { source })?; let arrow_schema: arrow_deps::arrow::datatypes::SchemaRef = schema.into(); - // Build the columns for the group keys. This involves pivoting the - // row-wise group keys into column-wise data. - let mut group_column_builders = (0..result.schema.group_columns.len()) - .map(|_| { - arrow::array::StringBuilder::with_capacity( - result.cardinality(), - result.cardinality() * 8, // arbitrarily picked for now - ) - }) - .collect::>(); - - // build each column for a group key value row by row. - for gk in result.group_keys.iter() { - for (i, v) in gk.0.iter().enumerate() { - group_column_builders[i] - .append_value(v.string()) - .map_err(|source| Error::ArrowError { source })?; - } - } - // Add the group columns to the set of column data for the record batch. let mut columns: Vec> = Vec::with_capacity(result.schema.len()); - for col in group_column_builders.iter_mut() { - columns.push(Arc::new(col.finish())); + + for (_, data_type) in &result.schema.group_columns { + match data_type { + LogicalDataType::String => { + columns.push(Arc::new(array::StringArray::from( + result.group_key_cols.remove(0), // move column out of result + ))); + } + _ => panic!("only String currently supported as group column"), + } } - // For the aggregate columns, build one column at a time, repeatedly - // iterating rows until all columns have been built. - // - // TODO(edd): I don't like this *at all*. I'm going to refactor the way - // aggregates are produced. - for (i, (_, _, data_type)) in result.schema.aggregate_columns.iter().enumerate() { + for (_, _, data_type) in &result.schema.aggregate_columns { match data_type { LogicalDataType::Integer => { - let mut builder = array::Int64Builder::new(result.cardinality()); - for agg_row in &result.aggregates { - builder - .append_option(agg_row.0[i].try_as_i64_scalar()) - .context(ArrowError)?; - } - columns.push(Arc::new(builder.finish())); + columns.push(Arc::new(array::Int64Array::from( + result.aggregate_cols.remove(0).take_as_i64(), + ))); } LogicalDataType::Unsigned => { - let mut builder = array::UInt64Builder::new(result.cardinality()); - for agg_row in &result.aggregates { - builder - .append_option(agg_row.0[i].try_as_u64_scalar()) - .context(ArrowError)?; - } - columns.push(Arc::new(builder.finish())); + columns.push(Arc::new(array::UInt64Array::from( + result.aggregate_cols.remove(0).take_as_u64(), + ))); } LogicalDataType::Float => { - let mut builder = array::Float64Builder::new(result.cardinality()); - for agg_row in &result.aggregates { - builder - .append_option(agg_row.0[i].try_as_f64_scalar()) - .context(ArrowError)?; - } - columns.push(Arc::new(builder.finish())); + columns.push(Arc::new(array::Float64Array::from( + result.aggregate_cols.remove(0).take_as_f64(), + ))); } LogicalDataType::String => { - let mut builder = array::StringBuilder::new(result.cardinality()); - for agg_row in &result.aggregates { - match agg_row.0[i].try_as_str() { - Some(s) => builder.append_value(s).context(ArrowError)?, - None => builder.append_null().context(ArrowError)?, - } - } - columns.push(Arc::new(builder.finish())); + columns.push(Arc::new(array::StringArray::from( + result + .aggregate_cols + .remove(0) + .take_as_str() + .iter() + .map(|x| x.as_deref()) + .collect::>(), + ))); } LogicalDataType::Binary => { - let mut builder = array::BinaryBuilder::new(result.cardinality()); - for agg_row in &result.aggregates { - match agg_row.0[i].try_as_bytes() { - Some(s) => builder.append_value(s).context(ArrowError)?, - None => builder.append_null().context(ArrowError)?, - } - } - columns.push(Arc::new(builder.finish())); + columns.push(Arc::new(array::BinaryArray::from( + result + .aggregate_cols + .remove(0) + .take_as_bytes() + .iter() + .map(|x| x.as_deref()) + .collect::>(), + ))); } LogicalDataType::Boolean => { - let mut builder = array::BooleanBuilder::new(result.cardinality()); - for agg_row in &result.aggregates { - builder - .append_option(agg_row.0[i].try_as_bool()) - .context(ArrowError)?; - } - columns.push(Arc::new(builder.finish())); + columns.push(Arc::new(array::BooleanArray::from( + result.aggregate_cols.remove(0).take_as_bool(), + ))); } } } + // everything has been moved and copied into record batch. + assert!(result.group_key_cols.is_empty()); + assert!(result.aggregate_cols.is_empty()); + // try_new only returns an error if the schema is invalid or the number // of rows on columns differ. We have full control over both so there // should never be an error to return... @@ -1989,8 +2125,8 @@ impl TryFrom> for RecordBatch { impl PartialEq for ReadAggregateResult<'_> { fn eq(&self, other: &Self) -> bool { self.schema() == other.schema() - && self.group_keys == other.group_keys - && self.aggregates == other.aggregates + && self.group_key_cols == other.group_key_cols + && self.aggregate_cols == other.aggregate_cols } } @@ -2015,24 +2151,27 @@ impl std::fmt::Display for &ReadAggregateResult<'_> { } // There may or may not be group keys - let expected_rows = self.aggregates.len(); + let expected_rows = self.rows(); for row in 0..expected_rows { if row > 0 { writeln!(f)?; } // write row for group by columns - if !self.group_keys.is_empty() { - for value in self.group_keys[row].0.iter() { - write!(f, "{},", value)?; + if self.is_grouped_aggregate() { + for col in &self.group_key_cols { + match col[row] { + Some(v) => write!(f, "{},", v)?, + None => write!(f, "NULL,")?, + } } } // write row for aggregate columns - for (col_i, agg) in self.aggregates[row].0.iter().enumerate() { - write!(f, "{}", agg)?; - if col_i < self.aggregates[row].0.len() - 1 { - write!(f, ",")?; + for (i, col) in self.aggregate_cols.iter().enumerate() { + col.write_value(row, f)?; + if i < self.aggregate_cols.len() - 1 { + write!(f, ",")? } } } @@ -2235,7 +2374,7 @@ west,4 } #[test] - fn read_group() { + fn read_aggregate() { let mut columns = BTreeMap::new(); let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..])); columns.insert("time".to_string(), tc); @@ -2279,20 +2418,20 @@ west,4 // test queries with no predicates and grouping on low cardinality // columns - read_group_all_rows_all_rle(&row_group); + read_aggregate_all_rows_all_rle(&row_group); // test row group queries that group on fewer than five columns. - read_group_hash_u128_key(&row_group); + read_aggregate_hash_u128_key(&row_group); // test row group queries that use a vector-based group key. - read_group_hash_vec_key(&row_group); + read_aggregate_hash_vec_key(&row_group); // test row group queries that only group on one column. - read_group_single_groupby_column(&row_group); + read_aggregate_single_groupby_column(&row_group); } // the read_group path where grouping is on fewer than five columns. - fn read_group_hash_u128_key(row_group: &RowGroup) { + fn read_aggregate_hash_u128_key(row_group: &RowGroup) { let cases = vec![ ( Predicate::with_time_range(&[], 0, 7), // all time but without explicit pred @@ -2364,7 +2503,7 @@ west,prod,POST,4 // the read_group path where grouping is on five or more columns. This will // ensure that the `read_group_hash_with_vec_key` path is exercised. - fn read_group_hash_vec_key(row_group: &RowGroup) { + fn read_aggregate_hash_vec_key(row_group: &RowGroup) { let cases = vec![( Predicate::with_time_range(&[], 0, 7), // all time but with explicit pred vec!["region", "method", "env", "letters", "numbers"], @@ -2387,7 +2526,7 @@ west,POST,prod,Bravo,two,203 } // the read_group path where grouping is on a single column. - fn read_group_single_groupby_column(row_group: &RowGroup) { + fn read_aggregate_single_groupby_column(row_group: &RowGroup) { let cases = vec![( Predicate::with_time_range(&[], 0, 7), // all time but with explicit pred vec!["method"], @@ -2406,7 +2545,7 @@ PUT,203 } } - fn read_group_all_rows_all_rle(row_group: &RowGroup) { + fn read_aggregate_all_rows_all_rle(row_group: &RowGroup) { let cases = vec![ ( Predicate::default(), @@ -2467,7 +2606,7 @@ west,POST,304,101,203 } #[test] - fn row_group_could_satisfy_predicate() { + fn row_aggregate_could_satisfy_predicate() { let mut columns = BTreeMap::new(); let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..])); columns.insert("time".to_string(), tc); @@ -2523,7 +2662,7 @@ west,POST,304,101,203 } #[test] - fn row_group_satisfies_predicate() { + fn row_aggregate_satisfies_predicate() { let mut columns = BTreeMap::new(); let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..])); columns.insert("time".to_string(), tc); @@ -2612,7 +2751,7 @@ west,POST,304,101,203 } #[test] - fn read_group_result_display() { + fn read_aggregate_result_display() { let mut result = ReadAggregateResult { schema: ResultSchema { select_columns: vec![], @@ -2639,37 +2778,27 @@ west,POST,304,101,203 ), ], }, - group_keys: vec![ - GroupKey(vec![Value::String("east"), Value::String("host-a")]), - GroupKey(vec![Value::String("east"), Value::String("host-b")]), - GroupKey(vec![Value::String("west"), Value::String("host-a")]), - GroupKey(vec![Value::String("west"), Value::String("host-c")]), - GroupKey(vec![Value::String("west"), Value::String("host-d")]), + group_key_cols: vec![ + vec![ + Some("east"), + Some("east"), + Some("west"), + Some("west"), + Some("west"), + ], + vec![ + Some("host-a"), + Some("host-b"), + Some("host-a"), + Some("host-c"), + Some("host-d"), + ], ], - aggregates: vec![ - AggregateResults(vec![ - AggregateResult::Sum(Scalar::I64(10)), - AggregateResult::Count(3), - ]), - AggregateResults(vec![ - AggregateResult::Sum(Scalar::I64(20)), - AggregateResult::Count(4), - ]), - AggregateResults(vec![ - AggregateResult::Sum(Scalar::I64(25)), - AggregateResult::Count(3), - ]), - AggregateResults(vec![ - AggregateResult::Sum(Scalar::I64(21)), - AggregateResult::Count(1), - ]), - AggregateResults(vec![ - AggregateResult::Sum(Scalar::I64(11)), - AggregateResult::Count(9), - ]), + aggregate_cols: vec![ + AggregateVec::SumI64(vec![Some(10), Some(20), Some(25), Some(21), Some(11)]), + AggregateVec::Count(vec![Some(3), Some(4), Some(3), Some(1), Some(9)]), ], group_keys_sorted: false, - ..Default::default() }; // Debug implementation @@ -2697,7 +2826,7 @@ west,host-d,11,9 // results don't have to have group keys. result.schema.group_columns = vec![]; - result.group_keys = vec![]; + result.group_key_cols = vec![]; // Debug implementation assert_eq!( @@ -2724,7 +2853,7 @@ west,host-d,11,9 } #[test] - fn read_group_result_merge() { + fn read_aggregate_result_merge() { let schema = ResultSchema { group_columns: vec![ ( @@ -2756,24 +2885,18 @@ west,host-d,11,9 ..Default::default() }; - let mut other_result = ReadAggregateResult { + let other_result = ReadAggregateResult { schema: schema.clone(), + group_key_cols: vec![ + vec![Some("east"), Some("east")], + vec![Some("host-a"), Some("host-b")], + ], + aggregate_cols: vec![ + AggregateVec::SumI64(vec![Some(10), Some(20)]), + AggregateVec::Count(vec![Some(3), Some(4)]), + ], ..Default::default() }; - other_result.add_row( - vec![Value::String("east"), Value::String("host-a")], - vec![ - AggregateResult::Sum(Scalar::I64(10)), - AggregateResult::Count(3), - ], - ); - other_result.add_row( - vec![Value::String("east"), Value::String("host-b")], - vec![ - AggregateResult::Sum(Scalar::I64(20)), - AggregateResult::Count(4), - ], - ); // merging something into nothing results in having a copy of something. result = result.merge(other_result.clone()); @@ -2786,19 +2909,13 @@ west,host-d,11,9 result, ReadAggregateResult { schema: schema.clone(), - group_keys: vec![ - GroupKey(vec![Value::String("east"), Value::String("host-a")]), - GroupKey(vec![Value::String("east"), Value::String("host-b")]), + group_key_cols: vec![ + vec![Some("east"), Some("east")], + vec![Some("host-a"), Some("host-b")], ], - aggregates: vec![ - AggregateResults(vec![ - AggregateResult::Sum(Scalar::I64(20)), - AggregateResult::Count(6), - ]), - AggregateResults(vec![ - AggregateResult::Sum(Scalar::I64(40)), - AggregateResult::Count(8), - ]), + aggregate_cols: vec![ + AggregateVec::SumI64(vec![Some(20), Some(40)]), + AggregateVec::Count(vec![Some(6), Some(8)]), ], ..Default::default() } @@ -2806,41 +2923,28 @@ west,host-d,11,9 // merging a result in with different group keys merges those group // keys in. - let mut other_result = ReadAggregateResult { + let other_result = ReadAggregateResult { schema: schema.clone(), + group_key_cols: vec![vec![Some("north")], vec![Some("host-a")]], + aggregate_cols: vec![ + AggregateVec::SumI64(vec![Some(-5)]), + AggregateVec::Count(vec![Some(2)]), + ], ..Default::default() }; - other_result.add_row( - vec![Value::String("north"), Value::String("host-a")], - vec![ - AggregateResult::Sum(Scalar::I64(-5)), - AggregateResult::Count(2), - ], - ); result = result.merge(other_result.clone()); assert_eq!( result, ReadAggregateResult { schema: schema.clone(), - group_keys: vec![ - GroupKey(vec![Value::String("east"), Value::String("host-a")]), - GroupKey(vec![Value::String("east"), Value::String("host-b")]), - GroupKey(vec![Value::String("north"), Value::String("host-a")]), + group_key_cols: vec![ + vec![Some("east"), Some("east"), Some("north")], + vec![Some("host-a"), Some("host-b"), Some("host-a")], ], - aggregates: vec![ - AggregateResults(vec![ - AggregateResult::Sum(Scalar::I64(20)), - AggregateResult::Count(6), - ]), - AggregateResults(vec![ - AggregateResult::Sum(Scalar::I64(40)), - AggregateResult::Count(8), - ]), - AggregateResults(vec![ - AggregateResult::Sum(Scalar::I64(-5)), - AggregateResult::Count(2), - ]), + aggregate_cols: vec![ + AggregateVec::SumI64(vec![Some(20), Some(40), Some(-5)]), + AggregateVec::Count(vec![Some(6), Some(8), Some(2)]), ], ..Default::default() } @@ -2857,30 +2961,45 @@ west,host-d,11,9 result, ReadAggregateResult { schema, - group_keys: vec![ - GroupKey(vec![Value::String("east"), Value::String("host-a")]), - GroupKey(vec![Value::String("east"), Value::String("host-b")]), - GroupKey(vec![Value::String("north"), Value::String("host-a")]), + group_key_cols: vec![ + vec![Some("east"), Some("east"), Some("north")], + vec![Some("host-a"), Some("host-b"), Some("host-a")], ], - aggregates: vec![ - AggregateResults(vec![ - AggregateResult::Sum(Scalar::I64(20)), - AggregateResult::Count(6), - ]), - AggregateResults(vec![ - AggregateResult::Sum(Scalar::I64(40)), - AggregateResult::Count(8), - ]), - AggregateResults(vec![ - AggregateResult::Sum(Scalar::I64(-5)), - AggregateResult::Count(2), - ]), + aggregate_cols: vec![ + AggregateVec::SumI64(vec![Some(20), Some(40), Some(-5)]), + AggregateVec::Count(vec![Some(6), Some(8), Some(2)]), ], ..Default::default() } ); } + #[test] + fn read_aggregate_result_sort() { + let mut result = ReadAggregateResult { + schema: ResultSchema::default(), + group_key_cols: vec![ + vec![Some("west"), Some("east"), Some("north")], + vec![Some("host-c"), Some("host-c"), Some("host-c")], + vec![Some("pro"), Some("stag"), Some("dev")], + ], + aggregate_cols: vec![ + AggregateVec::SumI64(vec![Some(10), Some(20), Some(-5)]), + AggregateVec::Count(vec![Some(6), Some(8), Some(2)]), + ], + ..Default::default() + }; + result.sort(); + + assert_eq!( + format!("{}", &result), + "east,host-c,stag,20,8 +north,host-c,dev,-5,2 +west,host-c,pro,10,6 +" + ); + } + #[test] fn column_meta_equal() { let col1 = ColumnMeta { diff --git a/read_buffer/src/schema.rs b/read_buffer/src/schema.rs index 9d9828ff2a..a9964356b1 100644 --- a/read_buffer/src/schema.rs +++ b/read_buffer/src/schema.rs @@ -50,6 +50,7 @@ impl ResultSchema { self.len() == 0 } + /// The total number of columns the schema represents. pub fn len(&self) -> usize { self.select_columns.len() + self.group_columns.len() + self.aggregate_columns.len() } diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index f11c56e1b6..23f630cf81 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -10,7 +10,7 @@ use arrow_deps::arrow::record_batch::RecordBatch; use internal_types::selection::Selection; use snafu::{ensure, Snafu}; -use crate::row_group::{self, ColumnName, GroupKey, Predicate, RowGroup}; +use crate::row_group::{self, ColumnName, Predicate, RowGroup}; use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema}; use crate::value::{AggregateResult, Scalar, Value}; #[derive(Debug, Snafu)] @@ -273,7 +273,7 @@ impl Table { _group_columns: Vec>, _aggregates: Vec<(ColumnName<'a>, AggregateType)>, _window: i64, - ) -> BTreeMap, Vec<(ColumnName<'a>, AggregateResult<'_>)>> { + ) -> BTreeMap, Vec<(ColumnName<'a>, AggregateResult<'_>)>> { // identify segments where time range and predicates match could match // using segment meta data, and then execute against those segments and // merge results. @@ -942,7 +942,7 @@ mod test { use crate::row_group::{BinaryExpr, ColumnType, ReadAggregateResult}; use crate::schema; use crate::schema::LogicalDataType; - use crate::value::{OwnedValue, Scalar}; + use crate::value::{AggregateVec, OwnedValue, Scalar}; #[test] fn meta_data_update_with() { @@ -1237,7 +1237,7 @@ mod test { #[test] fn read_aggregate_result_display() { - let mut result_a = ReadAggregateResult { + let result_a = ReadAggregateResult { schema: ResultSchema { select_columns: vec![], group_columns: vec![ @@ -1256,14 +1256,12 @@ mod test { LogicalDataType::Integer, )], }, + group_key_cols: vec![vec![Some("east")], vec![Some("host-a")]], + aggregate_cols: vec![AggregateVec::SumI64(vec![Some(10)])], ..ReadAggregateResult::default() }; - result_a.add_row( - vec![Value::String("east"), Value::String("host-a")], - vec![AggregateResult::Sum(Scalar::I64(10))], - ); - let mut result_b = ReadAggregateResult { + let result_b = ReadAggregateResult { schema: ResultSchema { select_columns: vec![], group_columns: vec![ @@ -1282,12 +1280,10 @@ mod test { LogicalDataType::Integer, )], }, + group_key_cols: vec![vec![Some("west")], vec![Some("host-b")]], + aggregate_cols: vec![AggregateVec::SumI64(vec![Some(100)])], ..Default::default() }; - result_b.add_row( - vec![Value::String("west"), Value::String("host-b")], - vec![AggregateResult::Sum(Scalar::I64(100))], - ); let results = DisplayReadAggregateResults(vec![result_a, result_b]); //Display implementation assert_eq!( diff --git a/read_buffer/src/value.rs b/read_buffer/src/value.rs index 94ecdd8cb1..85ec979c5d 100644 --- a/read_buffer/src/value.rs +++ b/read_buffer/src/value.rs @@ -1,13 +1,13 @@ -use std::{collections::BTreeSet, convert::TryFrom}; +use std::{collections::BTreeSet, convert::TryFrom, fmt::Formatter}; use std::{mem::size_of, sync::Arc}; use arrow_deps::arrow; use crate::{AggregateType, LogicalDataType}; -#[derive(Clone)] -pub enum AggregateVec<'a> { - Count(Vec), +#[derive(Clone, PartialEq, Debug)] +pub enum AggregateVec { + Count(Vec>), SumI64(Vec>), SumU64(Vec>), @@ -16,45 +16,104 @@ pub enum AggregateVec<'a> { MinU64(Vec>), MinI64(Vec>), MinF64(Vec>), - MinString(Vec>), - MinBytes(Vec>), + MinString(Vec>), + MinBytes(Vec>>), MinBool(Vec>), MaxU64(Vec>), MaxI64(Vec>), MaxF64(Vec>), - MaxString(Vec>), - MaxBytes(Vec>), + MaxString(Vec>), + MaxBytes(Vec>>), MaxBool(Vec>), FirstU64((Vec>, Vec>)), FirstI64((Vec>, Vec>)), FirstF64((Vec>, Vec>)), - FirstString((Vec>, Vec>)), - FirstBytes((Vec>, Vec>)), + FirstString((Vec>, Vec>)), + FirstBytes((Vec>>, Vec>)), FirstBool((Vec>, Vec>)), LastU64((Vec>, Vec>)), LastI64((Vec>, Vec>)), LastF64((Vec>, Vec>)), - LastString((Vec>, Vec>)), - LastBytes((Vec>, Vec>)), + LastString((Vec>, Vec>)), + LastBytes((Vec>>, Vec>)), LastBool((Vec>, Vec>)), } -impl AggregateVec<'_> { +impl AggregateVec { + pub fn len(&self) -> usize { + match self { + Self::Count(arr) => arr.len(), + Self::SumI64(arr) => arr.len(), + Self::SumU64(arr) => arr.len(), + Self::SumF64(arr) => arr.len(), + Self::MinU64(arr) => arr.len(), + Self::MinI64(arr) => arr.len(), + Self::MinF64(arr) => arr.len(), + Self::MinString(arr) => arr.len(), + Self::MinBytes(arr) => arr.len(), + Self::MinBool(arr) => arr.len(), + Self::MaxU64(arr) => arr.len(), + Self::MaxI64(arr) => arr.len(), + Self::MaxF64(arr) => arr.len(), + Self::MaxString(arr) => arr.len(), + Self::MaxBytes(arr) => arr.len(), + Self::MaxBool(arr) => arr.len(), + Self::FirstU64((arr, _)) => arr.len(), + Self::FirstI64((arr, _)) => arr.len(), + Self::FirstF64((arr, _)) => arr.len(), + Self::FirstString((arr, _)) => arr.len(), + Self::FirstBytes((arr, _)) => arr.len(), + Self::FirstBool((arr, _)) => arr.len(), + Self::LastU64((arr, _)) => arr.len(), + Self::LastI64((arr, _)) => arr.len(), + Self::LastF64((arr, _)) => arr.len(), + Self::LastString((arr, _)) => arr.len(), + Self::LastBytes((arr, _)) => arr.len(), + Self::LastBool((arr, _)) => arr.len(), + } + } + + /// Returns the value specified by `offset`. + pub fn value(&self, offset: usize) -> Value<'_> { + match &self { + Self::Count(arr) => Value::from(arr[offset]), + Self::SumI64(arr) => Value::from(arr[offset]), + Self::SumU64(arr) => Value::from(arr[offset]), + Self::SumF64(arr) => Value::from(arr[offset]), + Self::MinU64(arr) => Value::from(arr[offset]), + Self::MinI64(arr) => Value::from(arr[offset]), + Self::MinF64(arr) => Value::from(arr[offset]), + Self::MinString(arr) => Value::from(arr[offset].as_deref()), + Self::MinBytes(arr) => Value::from(arr[offset].as_deref()), + Self::MinBool(arr) => Value::from(arr[offset]), + Self::MaxU64(arr) => Value::from(arr[offset]), + Self::MaxI64(arr) => Value::from(arr[offset]), + Self::MaxF64(arr) => Value::from(arr[offset]), + Self::MaxString(arr) => Value::from(arr[offset].as_deref()), + Self::MaxBytes(arr) => Value::from(arr[offset].as_deref()), + Self::MaxBool(arr) => Value::from(arr[offset]), + _ => unimplemented!("first/last not yet implemented"), + } + } + pub fn update(&mut self, values: &Values<'_>, row_id: usize, offset: usize) { match self { - AggregateVec::Count(arr) => { + Self::Count(arr) => { if values.is_null(row_id) { return; } else if offset >= arr.len() { - arr.resize(offset + 1, 0); + arr.resize(offset + 1, None); } - arr[offset] += 1; + match &mut arr[offset] { + Some(v) => *v += 1, + None => arr[offset] = Some(1), + } } - AggregateVec::SumI64(arr) => { + Self::SumI64(arr) => { if values.is_null(row_id) { return; } else if offset >= arr.len() { @@ -66,76 +125,871 @@ impl AggregateVec<'_> { None => arr[offset] = Some(values.value_i64(row_id)), } } - AggregateVec::SumU64(_) => {} - AggregateVec::SumF64(_) => {} - AggregateVec::MinU64(_) => {} - AggregateVec::MinI64(_) => {} - AggregateVec::MinF64(_) => {} - AggregateVec::MinString(_) => {} - AggregateVec::MinBytes(_) => {} - AggregateVec::MinBool(_) => {} - AggregateVec::MaxU64(_) => {} - AggregateVec::MaxI64(_) => {} - AggregateVec::MaxF64(_) => {} - AggregateVec::MaxString(_) => {} - AggregateVec::MaxBytes(_) => {} - AggregateVec::MaxBool(_) => {} - AggregateVec::FirstU64(_) => {} - AggregateVec::FirstI64(_) => {} - AggregateVec::FirstF64(_) => {} - AggregateVec::FirstString(_) => {} - AggregateVec::FirstBytes(_) => {} - AggregateVec::FirstBool(_) => {} - AggregateVec::LastU64(_) => {} - AggregateVec::LastI64(_) => {} - AggregateVec::LastF64(_) => {} - AggregateVec::LastString(_) => {} - AggregateVec::LastBytes(_) => {} - AggregateVec::LastBool(_) => {} + Self::SumU64(arr) => { + if values.is_null(row_id) { + return; + } else if offset >= arr.len() { + arr.resize(offset + 1, None); + } + + match &mut arr[offset] { + Some(v) => *v += values.value_u64(row_id), + None => arr[offset] = Some(values.value_u64(row_id)), + } + } + Self::SumF64(arr) => { + if values.is_null(row_id) { + return; + } else if offset >= arr.len() { + arr.resize(offset + 1, None); + } + + match &mut arr[offset] { + Some(v) => *v += values.value_f64(row_id), + None => arr[offset] = Some(values.value_f64(row_id)), + } + } + Self::MinU64(arr) => { + if values.is_null(row_id) { + return; + } else if offset >= arr.len() { + arr.resize(offset + 1, None); + } + + match &mut arr[offset] { + Some(v) => *v = (*v).min(values.value_u64(row_id)), + None => arr[offset] = Some(values.value_u64(row_id)), + } + } + Self::MinI64(arr) => { + if values.is_null(row_id) { + return; + } else if offset >= arr.len() { + arr.resize(offset + 1, None); + } + + match &mut arr[offset] { + Some(v) => *v = (*v).min(values.value_i64(row_id)), + None => arr[offset] = Some(values.value_i64(row_id)), + } + } + Self::MinF64(arr) => { + if values.is_null(row_id) { + return; + } else if offset >= arr.len() { + arr.resize(offset + 1, None); + } + + match &mut arr[offset] { + Some(v) => *v = (*v).min(values.value_f64(row_id)), + None => arr[offset] = Some(values.value_f64(row_id)), + } + } + Self::MinString(arr) => { + if values.is_null(row_id) { + return; + } else if offset >= arr.len() { + arr.resize(offset + 1, None); + } + + match &mut arr[offset] { + Some(v) => { + let other = values.value_str(row_id); + if other < v.as_str() { + *v = other.to_owned(); + } + } + None => arr[offset] = Some(values.value_str(row_id).to_owned()), + } + } + Self::MinBytes(arr) => { + if values.is_null(row_id) { + return; + } else if offset >= arr.len() { + arr.resize(offset + 1, None); + } + + match &mut arr[offset] { + Some(v) => { + let other = values.value_bytes(row_id); + if other < v.as_slice() { + *v = other.to_owned(); + } + } + None => arr[offset] = Some(values.value_bytes(row_id).to_owned()), + } + } + Self::MinBool(arr) => { + if values.is_null(row_id) { + return; + } else if offset >= arr.len() { + arr.resize(offset + 1, None); + } + + match &mut arr[offset] { + Some(v) => *v = (*v).min(values.value_bool(row_id)), + None => arr[offset] = Some(values.value_bool(row_id)), + } + } + Self::MaxU64(arr) => { + if values.is_null(row_id) { + return; + } else if offset >= arr.len() { + arr.resize(offset + 1, None); + } + + match &mut arr[offset] { + Some(v) => *v = (*v).max(values.value_u64(row_id)), + None => arr[offset] = Some(values.value_u64(row_id)), + } + } + Self::MaxI64(arr) => { + if values.is_null(row_id) { + return; + } else if offset >= arr.len() { + arr.resize(offset + 1, None); + } + + match &mut arr[offset] { + Some(v) => *v = (*v).max(values.value_i64(row_id)), + None => arr[offset] = Some(values.value_i64(row_id)), + } + } + Self::MaxF64(arr) => { + if values.is_null(row_id) { + return; + } else if offset >= arr.len() { + arr.resize(offset + 1, None); + } + + match &mut arr[offset] { + Some(v) => *v = (*v).max(values.value_f64(row_id)), + None => arr[offset] = Some(values.value_f64(row_id)), + } + } + Self::MaxString(arr) => { + if values.is_null(row_id) { + return; + } else if offset >= arr.len() { + arr.resize(offset + 1, None); + } + + match &mut arr[offset] { + Some(v) => { + let other = values.value_str(row_id); + if other > v.as_str() { + *v = other.to_owned(); + } + } + None => arr[offset] = Some(values.value_str(row_id).to_owned()), + } + } + Self::MaxBytes(arr) => { + if values.is_null(row_id) { + return; + } else if offset >= arr.len() { + arr.resize(offset + 1, None); + } + + match &mut arr[offset] { + Some(v) => { + let other = values.value_bytes(row_id); + if other > v.as_slice() { + *v = other.to_owned(); + } + } + None => arr[offset] = Some(values.value_bytes(row_id).to_owned()), + } + } + Self::MaxBool(arr) => { + if values.is_null(row_id) { + return; + } else if offset >= arr.len() { + arr.resize(offset + 1, None); + } + + match &mut arr[offset] { + Some(v) => *v = (*v).max(values.value_bool(row_id)), + None => arr[offset] = Some(values.value_bool(row_id)), + } + } + // TODO - implement first/last + _ => unimplemented!("aggregate update not implemented"), } } /// Appends the provided value to the end of the aggregate vector. /// Panics if the type of `Value` does not satisfy the aggregate type. - pub fn push(&mut self, value: &Value<'_>) { + /// + /// Note: updating pushed first/last variants is not currently a supported + /// operation. + pub fn push(&mut self, value: Value<'_>) { match self { - AggregateVec::Count(arr) => arr.push(value.u64()), - AggregateVec::SumI64(arr) => arr.push(Some(value.i64())), - AggregateVec::SumU64(_) => {} - AggregateVec::SumF64(_) => {} - AggregateVec::MinU64(_) => {} - AggregateVec::MinI64(_) => {} - AggregateVec::MinF64(_) => {} - AggregateVec::MinString(_) => {} - AggregateVec::MinBytes(_) => {} - AggregateVec::MinBool(_) => {} - AggregateVec::MaxU64(_) => {} - AggregateVec::MaxI64(_) => {} - AggregateVec::MaxF64(_) => {} - AggregateVec::MaxString(_) => {} - AggregateVec::MaxBytes(_) => {} - AggregateVec::MaxBool(_) => {} - AggregateVec::FirstU64(_) => {} - AggregateVec::FirstI64(_) => {} - AggregateVec::FirstF64(_) => {} - AggregateVec::FirstString(_) => {} - AggregateVec::FirstBytes(_) => {} - AggregateVec::FirstBool(_) => {} - AggregateVec::LastU64(_) => {} - AggregateVec::LastI64(_) => {} - AggregateVec::LastF64(_) => {} - AggregateVec::LastString(_) => {} - AggregateVec::LastBytes(_) => {} - AggregateVec::LastBool(_) => {} + Self::Count(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.u64())); + } + } + Self::SumI64(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.i64())); + } + } + Self::SumU64(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.u64())); + } + } + Self::SumF64(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.f64())); + } + } + Self::MinU64(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.u64())); + } + } + Self::MinI64(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.i64())); + } + } + Self::MinF64(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.f64())); + } + } + Self::MinString(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.str().to_owned())); + } + } + Self::MinBytes(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.bytes().to_owned())); + } + } + Self::MinBool(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.bool())); + } + } + Self::MaxU64(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.u64())); + } + } + Self::MaxI64(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.i64())); + } + } + Self::MaxF64(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.f64())); + } + } + Self::MaxString(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.str().to_owned())); + } + } + Self::MaxBytes(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.bytes().to_owned())); + } + } + Self::MaxBool(arr) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.bool())); + } + } + Self::FirstU64((arr, _)) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.u64())); + } + } + Self::FirstI64((arr, _)) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.i64())); + } + } + Self::FirstF64((arr, _)) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.f64())); + } + } + Self::FirstString((arr, _)) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.str().to_owned())); + } + } + Self::FirstBytes((arr, _)) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.bytes().to_owned())); + } + } + Self::FirstBool((arr, _)) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.bool())); + } + } + Self::LastU64((arr, _)) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.u64())); + } + } + Self::LastI64((arr, _)) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.i64())); + } + } + Self::LastF64((arr, _)) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.f64())); + } + } + Self::LastString((arr, _)) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.str().to_owned())); + } + } + Self::LastBytes((arr, _)) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.bytes().to_owned())); + } + } + Self::LastBool((arr, _)) => { + if value.is_null() { + arr.push(None); + } else { + arr.push(Some(value.bool())); + } + } + } + } + + pub fn write_value(&self, offset: usize, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Count(arr) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::SumI64(arr) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::SumU64(arr) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::SumF64(arr) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::MinU64(arr) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::MinI64(arr) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::MinF64(arr) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::MinString(arr) => match &arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::MinBytes(arr) => match &arr[offset] { + Some(v) => write!(f, "{:?}", v)?, + None => write!(f, "NULL")?, + }, + Self::MinBool(arr) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::MaxU64(arr) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::MaxI64(arr) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::MaxF64(arr) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::MaxString(arr) => match &arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::MaxBytes(arr) => match &arr[offset] { + Some(v) => write!(f, "{:?}", v)?, + None => write!(f, "NULL")?, + }, + Self::MaxBool(arr) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::FirstU64((arr, _)) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::FirstI64((arr, _)) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::FirstF64((arr, _)) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::FirstString((arr, _)) => match &arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::FirstBytes((arr, _)) => match &arr[offset] { + Some(v) => write!(f, "{:?}", v)?, + None => write!(f, "NULL")?, + }, + Self::FirstBool((arr, _)) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::LastU64((arr, _)) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::LastI64((arr, _)) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::LastF64((arr, _)) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::LastString((arr, _)) => match &arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + Self::LastBytes((arr, _)) => match &arr[offset] { + Some(v) => write!(f, "{:?}", v)?, + None => write!(f, "NULL")?, + }, + Self::LastBool((arr, _)) => match arr[offset] { + Some(v) => write!(f, "{}", v)?, + None => write!(f, "NULL")?, + }, + } + Ok(()) + } + + // Consumes self and returns the inner `Vec>`. + pub fn as_i64(&self) -> &Vec> { + match self { + Self::SumI64(arr) => arr, + Self::MinI64(arr) => arr, + Self::MaxI64(arr) => arr, + _ => panic!("cannot convert {} to Vec>", self), + } + } + + // Consumes self and returns the inner `Vec>`. + pub fn as_u64(&self) -> &Vec> { + match self { + Self::Count(arr) => arr, + Self::SumU64(arr) => arr, + Self::MinU64(arr) => arr, + Self::MaxU64(arr) => arr, + _ => panic!("cannot convert {} to Vec>", self), + } + } + + // Consumes self and returns the inner `Vec>`. + pub fn as_f64(&self) -> &Vec> { + match self { + Self::SumF64(arr) => arr, + Self::MinF64(arr) => arr, + Self::MaxF64(arr) => arr, + _ => panic!("cannot convert {} to Vec>", self), + } + } + + // Consumes self and returns the inner `Vec>`. + pub fn as_str(&self) -> &Vec> { + match self { + Self::MinString(arr) => arr, + Self::MaxString(arr) => arr, + _ => panic!("cannot convert {} to Vec>", self), + } + } + + // Consumes self and returns the inner `Vec>`. + pub fn as_bytes(&self) -> &Vec>> { + match self { + Self::MinBytes(arr) => arr, + Self::MaxBytes(arr) => arr, + _ => panic!("cannot convert {} to Vec>", self), + } + } + + // Consumes self and returns the inner `Vec>`. + pub fn as_bool(&self) -> &Vec> { + match self { + Self::MinBool(arr) => arr, + Self::MaxBool(arr) => arr, + _ => panic!("cannot convert {} to Vec", self), + } + } + + // Consumes self and returns the inner `Vec>`. + pub fn take_as_i64(self) -> Vec> { + match self { + Self::SumI64(arr) => arr, + Self::MinI64(arr) => arr, + Self::MaxI64(arr) => arr, + _ => panic!("cannot convert {} to Vec>", self), + } + } + + // Consumes self and returns the inner `Vec>`. + pub fn take_as_u64(self) -> Vec> { + match self { + Self::Count(arr) => arr, + Self::SumU64(arr) => arr, + Self::MinU64(arr) => arr, + Self::MaxU64(arr) => arr, + _ => panic!("cannot convert {} to Vec>", self), + } + } + + // Consumes self and returns the inner `Vec>`. + pub fn take_as_f64(self) -> Vec> { + match self { + Self::SumF64(arr) => arr, + Self::MinF64(arr) => arr, + Self::MaxF64(arr) => arr, + _ => panic!("cannot convert {} to Vec>", self), + } + } + + // Consumes self and returns the inner `Vec>`. + pub fn take_as_str(self) -> Vec> { + match self { + Self::MinString(arr) => arr, + Self::MaxString(arr) => arr, + _ => panic!("cannot convert {} to Vec>", self), + } + } + + // Consumes self and returns the inner `Vec>>`. + pub fn take_as_bytes(self) -> Vec>> { + match self { + Self::MinBytes(arr) => arr, + Self::MaxBytes(arr) => arr, + _ => panic!("cannot convert {} to Vec>", self), + } + } + + // Consumes self and returns the inner `Vec>`. + pub fn take_as_bool(self) -> Vec> { + match self { + Self::MinBool(arr) => arr, + Self::MaxBool(arr) => arr, + _ => panic!("cannot convert {} to Vec", self), + } + } + + /// Appends the `AggregateVec` with the provided `Option` iterator. + pub fn extend_with_i64(&mut self, itr: impl Iterator>) { + match self { + Self::SumI64(arr) => { + arr.extend(itr); + } + Self::MinI64(arr) => { + arr.extend(itr); + } + Self::MaxI64(arr) => { + arr.extend(itr); + } + _ => panic!("unsupported iterator"), + } + } + + /// Appends the `AggregateVec` with the provided `Option` iterator. + pub fn extend_with_u64(&mut self, itr: impl Iterator>) { + match self { + Self::Count(arr) => { + arr.extend(itr); + } + Self::SumU64(arr) => { + arr.extend(itr); + } + Self::MinU64(arr) => { + arr.extend(itr); + } + Self::MaxU64(arr) => { + arr.extend(itr); + } + _ => panic!("unsupported iterator"), + } + } + + /// Appends the `AggregateVec` with the provided `Option` iterator. + pub fn extend_with_f64(&mut self, itr: impl Iterator>) { + match self { + Self::SumF64(arr) => { + arr.extend(itr); + } + Self::MinF64(arr) => { + arr.extend(itr); + } + Self::MaxF64(arr) => { + arr.extend(itr); + } + _ => panic!("unsupported iterator"), + } + } + + /// Appends the `AggregateVec` with the provided `Option<&str>` iterator. + pub fn extend_with_str(&mut self, itr: impl Iterator>) { + match self { + Self::MinString(arr) => { + arr.extend(itr); + } + Self::MaxString(arr) => { + arr.extend(itr); + } + _ => panic!("unsupported iterator"), + } + } + + /// Appends the `AggregateVec` with the provided `Option>` iterator. + pub fn extend_with_bytes(&mut self, itr: impl Iterator>>) { + match self { + Self::MinBytes(arr) => { + arr.extend(itr); + } + Self::MaxBytes(arr) => { + arr.extend(itr); + } + _ => panic!("unsupported iterator"), + } + } + + /// Appends the `AggregateVec` with the provided `Option<&[u8]>` iterator. + pub fn extend_with_bool(&mut self, itr: impl Iterator>) { + match self { + Self::MinBool(arr) => { + arr.extend(itr); + } + Self::MaxBool(arr) => { + arr.extend(itr); + } + _ => panic!("unsupported iterator"), + } + } + + pub fn sort_with_permutation(&mut self, p: &permutation::Permutation) { + match self { + Self::Count(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::SumI64(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::SumU64(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::SumF64(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::MinU64(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::MinI64(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::MinF64(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::MinString(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::MinBytes(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::MinBool(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::MaxU64(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::MaxI64(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::MaxF64(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::MaxString(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::MaxBytes(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::MaxBool(arr) => { + *arr = p.apply_slice(arr.as_slice()); + } + Self::FirstU64((arr, time)) => { + *arr = p.apply_slice(arr.as_slice()); + *time = p.apply_slice(time.as_slice()); + } + Self::FirstI64((arr, time)) => { + *arr = p.apply_slice(arr.as_slice()); + *time = p.apply_slice(time.as_slice()); + } + Self::FirstF64((arr, time)) => { + *arr = p.apply_slice(arr.as_slice()); + *time = p.apply_slice(time.as_slice()); + } + Self::FirstString((arr, time)) => { + *arr = p.apply_slice(arr.as_slice()); + *time = p.apply_slice(time.as_slice()); + } + Self::FirstBytes((arr, time)) => { + *arr = p.apply_slice(arr.as_slice()); + *time = p.apply_slice(time.as_slice()); + } + Self::FirstBool((arr, time)) => { + *arr = p.apply_slice(arr.as_slice()); + *time = p.apply_slice(time.as_slice()); + } + Self::LastU64((arr, time)) => { + *arr = p.apply_slice(arr.as_slice()); + *time = p.apply_slice(time.as_slice()); + } + Self::LastI64((arr, time)) => { + *arr = p.apply_slice(arr.as_slice()); + *time = p.apply_slice(time.as_slice()); + } + Self::LastF64((arr, time)) => { + *arr = p.apply_slice(arr.as_slice()); + *time = p.apply_slice(time.as_slice()); + } + Self::LastString((arr, time)) => { + *arr = p.apply_slice(arr.as_slice()); + *time = p.apply_slice(time.as_slice()); + } + Self::LastBytes((arr, time)) => { + *arr = p.apply_slice(arr.as_slice()); + *time = p.apply_slice(time.as_slice()); + } + Self::LastBool((arr, time)) => { + *arr = p.apply_slice(arr.as_slice()); + *time = p.apply_slice(time.as_slice()); + } } } } -impl From<(&AggregateType, &LogicalDataType, usize)> for AggregateVec<'_> { +impl std::fmt::Display for AggregateVec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Count(_) => write!(f, "Count"), + Self::SumI64(_) => write!(f, "Sum"), + Self::SumU64(_) => write!(f, "Sum"), + Self::SumF64(_) => write!(f, "Sum"), + Self::MinU64(_) => write!(f, "Min"), + Self::MinI64(_) => write!(f, "Min"), + Self::MinF64(_) => write!(f, "Min"), + Self::MinString(_) => write!(f, "Min"), + Self::MinBytes(_) => write!(f, "Min>"), + Self::MinBool(_) => write!(f, "Min"), + Self::MaxU64(_) => write!(f, "Max"), + Self::MaxI64(_) => write!(f, "Max"), + Self::MaxF64(_) => write!(f, "Max"), + Self::MaxString(_) => write!(f, "Max"), + Self::MaxBytes(_) => write!(f, "Max>"), + Self::MaxBool(_) => write!(f, "Max"), + Self::FirstU64(_) => write!(f, "First"), + Self::FirstI64(_) => write!(f, "First"), + Self::FirstF64(_) => write!(f, "First"), + Self::FirstString(_) => write!(f, "First"), + Self::FirstBytes(_) => write!(f, "First>"), + Self::FirstBool(_) => write!(f, "First"), + Self::LastU64(_) => write!(f, "Last"), + Self::LastI64(_) => write!(f, "Last"), + Self::LastF64(_) => write!(f, "Last"), + Self::LastString(_) => write!(f, "Last"), + Self::LastBytes(_) => write!(f, "Last>"), + Self::LastBool(_) => write!(f, "Last"), + } + } +} + +impl From<(&AggregateType, &LogicalDataType, usize)> for AggregateVec { fn from(v: (&AggregateType, &LogicalDataType, usize)) -> Self { let length = v.2; match (v.0, v.1) { - (AggregateType::Count, _) => Self::Count(vec![0; length]), + (AggregateType::Count, _) => Self::Count(vec![None; length]), (AggregateType::First, LogicalDataType::Integer) => { Self::FirstI64((vec![None; length], vec![None; length])) } @@ -644,6 +1498,26 @@ impl<'a> std::ops::AddAssign<&Scalar> for &mut Scalar { } } +impl std::ops::Add for Scalar { + type Output = Self; + + fn add(self, other: Self) -> Self { + match (self, other) { + (Self::Null, Self::Null) => Self::Null, + (Self::Null, Self::I64(_)) => other, + (Self::Null, Self::U64(_)) => other, + (Self::Null, Self::F64(_)) => other, + (Self::I64(_), Self::Null) => self, + (Self::I64(a), Self::I64(b)) => Self::I64(a + b), + (Self::U64(_), Self::Null) => self, + (Self::U64(a), Self::U64(b)) => Self::U64(a + b), + (Self::F64(_), Self::Null) => self, + (Self::F64(a), Self::F64(b)) => Self::F64(a + b), + (a, b) => panic!("{:?} + {:?} is an unsupported operation", a, b), + } + } +} + impl std::fmt::Display for Scalar { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -728,7 +1602,7 @@ pub enum Value<'a> { Scalar(Scalar), } -impl Value<'_> { +impl<'a> Value<'a> { pub fn is_null(&self) -> bool { matches!(self, Self::Null) } @@ -740,30 +1614,44 @@ impl Value<'_> { panic!("cannot unwrap Value to Scalar"); } - pub fn i64(&self) -> i64 { + pub fn i64(self) -> i64 { if let Self::Scalar(Scalar::I64(v)) = self { - return *v; + return v; } panic!("cannot unwrap Value to i64"); } - pub fn u64(&self) -> u64 { + pub fn u64(self) -> u64 { if let Self::Scalar(Scalar::U64(v)) = self { - return *v; + return v; } panic!("cannot unwrap Value to u64"); } - pub fn string(&self) -> &str { + pub fn f64(self) -> f64 { + if let Self::Scalar(Scalar::F64(v)) = self { + return v; + } + panic!("cannot unwrap Value to u64"); + } + + pub fn str(self) -> &'a str { if let Self::String(s) = self { return s; } panic!("cannot unwrap Value to String"); } - pub fn bool(&self) -> bool { + pub fn bytes(self) -> &'a [u8] { + if let Self::ByteArray(s) = self { + return s; + } + panic!("cannot unwrap Value to byte array"); + } + + pub fn bool(self) -> bool { if let Self::Boolean(b) = self { - return *b; + return b; } panic!("cannot unwrap Value to Scalar"); } @@ -792,6 +1680,45 @@ impl<'a> From<&'a str> for Value<'a> { } } +impl<'a> From> for Value<'a> { + fn from(v: Option<&'a str>) -> Self { + match v { + Some(s) => Self::String(s), + None => Self::Null, + } + } +} + +impl<'a> From<&'a [u8]> for Value<'a> { + fn from(v: &'a [u8]) -> Self { + Self::ByteArray(v) + } +} + +impl<'a> From> for Value<'a> { + fn from(v: Option<&'a [u8]>) -> Self { + match v { + Some(s) => Self::ByteArray(s), + None => Self::Null, + } + } +} + +impl<'a> From for Value<'a> { + fn from(v: bool) -> Self { + Self::Boolean(v) + } +} + +impl<'a> From> for Value<'a> { + fn from(v: Option) -> Self { + match v { + Some(s) => Self::Boolean(s), + None => Self::Null, + } + } +} + // Implementations of From trait for various concrete types. macro_rules! scalar_from_impls { ($(($variant:ident, $type:ident),)*) => { @@ -820,6 +1747,17 @@ scalar_from_impls! { (F64, f64), } +impl std::ops::Add for Value<'_> { + type Output = Self; + + fn add(self, other: Self) -> Self { + match (self, other) { + (Self::Scalar(a), Self::Scalar(b)) => Self::Scalar(a + b), + _ => panic!("unsupported operation on Value"), + } + } +} + /// Each variant is a typed vector of materialised values for a column. #[derive(Debug, PartialEq)] pub enum Values<'a> { @@ -914,6 +1852,48 @@ impl<'a> Values<'a> { _ => panic!("value cannot be returned as i64"), } } + + // Returns a value as an u64. Panics if not possible. + fn value_u64(&self, i: usize) -> u64 { + match &self { + Values::U64(c) => c[i], + Values::U64N(c) => c[i].unwrap(), + _ => panic!("value cannot be returned as u64"), + } + } + + // Returns a value as an f64. Panics if not possible. + fn value_f64(&self, i: usize) -> f64 { + match &self { + Values::F64(c) => c[i], + Values::F64N(c) => c[i].unwrap(), + _ => panic!("value cannot be returned as f64"), + } + } + + // Returns a value as a string. Panics if not possible. + fn value_str(&self, i: usize) -> &'a str { + match &self { + Values::String(c) => c[i].unwrap(), + _ => panic!("value cannot be returned as &str"), + } + } + + // Returns a value as a binary array. Panics if not possible. + fn value_bytes(&self, i: usize) -> &'a [u8] { + match &self { + Values::ByteArray(c) => c[i].unwrap(), + _ => panic!("value cannot be returned as &str"), + } + } + + // Returns a value as a bool. Panics if not possible. + fn value_bool(&self, i: usize) -> bool { + match &self { + Values::Bool(c) => c[i].unwrap(), + _ => panic!("value cannot be returned as &str"), + } + } } /// Moves ownership of Values into an arrow `ArrayRef`.