refactor: address lifetimes
parent
9cb18fd942
commit
b0e0676f61
|
@ -160,8 +160,8 @@ impl Segment {
|
|||
&self,
|
||||
row_ids: &croaring::Bitmap,
|
||||
columns: &[String],
|
||||
) -> BTreeMap<String, column::Vector> {
|
||||
let mut rows: BTreeMap<String, column::Vector> = BTreeMap::new();
|
||||
) -> BTreeMap<String, column::Vector<'_>> {
|
||||
let mut rows: BTreeMap<String, column::Vector<'_>> = BTreeMap::new();
|
||||
if row_ids.is_empty() {
|
||||
// nothing to return
|
||||
return rows;
|
||||
|
@ -224,11 +224,12 @@ impl Segment {
|
|||
pub fn aggregate_by_group_with_hash<'a>(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
predicates: &[(&str, Option<&column::Scalar<'a>>)],
|
||||
group_columns: &[String],
|
||||
aggregates: &'a [(String, AggregateType)],
|
||||
window: i64,
|
||||
) -> BTreeMap<Vec<i64>, Vec<(&'a String, &'a AggregateType, Option<column::Aggregate>)>> {
|
||||
) -> BTreeMap<Vec<i64>, Vec<(&'a String, &'a AggregateType, Option<column::Aggregate<'a>>)>>
|
||||
{
|
||||
// Build a hash table - essentially, scan columns for matching row ids,
|
||||
// emitting the encoded value for each column and track those value
|
||||
// combinations in a hashmap with running aggregates.
|
||||
|
@ -338,10 +339,10 @@ impl Segment {
|
|||
// hashMap is about 20% faster than BTreeMap in this case
|
||||
let mut hash_table: BTreeMap<
|
||||
Vec<i64>,
|
||||
Vec<(&'a String, &'a AggregateType, Option<column::Aggregate>)>,
|
||||
Vec<(&'a String, &'a AggregateType, Option<column::Aggregate<'_>>)>,
|
||||
> = BTreeMap::new();
|
||||
|
||||
let mut aggregate_row: Vec<(&str, Option<column::Scalar>)> =
|
||||
let mut aggregate_row: Vec<(&str, Option<column::Scalar<'_>>)> =
|
||||
std::iter::repeat_with(|| ("", None))
|
||||
.take(aggregate_itrs.len())
|
||||
.collect();
|
||||
|
@ -373,7 +374,7 @@ impl Segment {
|
|||
let mut agg_results: Vec<(
|
||||
&'a String,
|
||||
&'a AggregateType,
|
||||
Option<column::Aggregate>,
|
||||
Option<column::Aggregate<'_>>,
|
||||
)> = Vec::with_capacity(aggregates.len());
|
||||
for (col_name, agg_type) in aggregates {
|
||||
agg_results.push((col_name, agg_type, None)); // switch out Aggregate for Option<column::Aggregate>
|
||||
|
@ -429,11 +430,11 @@ impl Segment {
|
|||
pub fn aggregate_by_group_using_sort(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
predicates: &[(&str, Option<&column::Scalar<'_>>)],
|
||||
group_columns: &[String],
|
||||
aggregates: &[(String, AggregateType)],
|
||||
window: i64,
|
||||
) -> Vec<GroupedAggregates> {
|
||||
) -> Vec<GroupedAggregates<'_>> {
|
||||
log::debug!("aggregate_by_group_with_sort_unsorted called");
|
||||
|
||||
if window > 0 {
|
||||
|
@ -563,7 +564,7 @@ impl Segment {
|
|||
pub fn aggregate_by_group_using_stream<'a>(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
predicates: &[(&str, Option<&column::Scalar<'_>>)],
|
||||
group_columns: &[String],
|
||||
aggregates: &[(String, AggregateType)],
|
||||
window: i64,
|
||||
|
@ -640,7 +641,7 @@ impl Segment {
|
|||
// available and appropriately sorted this method will build a result set of
|
||||
// aggregates in a streaming way.
|
||||
pub fn stream_grouped_aggregates<'a>(
|
||||
mut group_itrs: Vec<core::slice::Iter<i64>>,
|
||||
mut group_itrs: Vec<core::slice::Iter<'_, i64>>,
|
||||
aggregate_cols: Vec<(&String, &AggregateType, impl column::AggregatableByRange)>,
|
||||
total_rows: usize,
|
||||
window: i64,
|
||||
|
@ -743,7 +744,11 @@ impl Segment {
|
|||
vec![]
|
||||
}
|
||||
|
||||
pub fn sum_column(&self, name: &str, row_ids: &mut croaring::Bitmap) -> Option<column::Scalar> {
|
||||
pub fn sum_column(
|
||||
&self,
|
||||
name: &str,
|
||||
row_ids: &mut croaring::Bitmap,
|
||||
) -> Option<column::Scalar<'_>> {
|
||||
if let Some(c) = self.column(name) {
|
||||
return c.sum_by_ids(row_ids);
|
||||
}
|
||||
|
@ -765,7 +770,7 @@ impl Segment {
|
|||
pub fn filter_by_predicates_eq(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
predicates: &[(&str, Option<&column::Scalar<'_>>)],
|
||||
) -> Option<croaring::Bitmap> {
|
||||
if !self.meta.overlaps_time_range(time_range.0, time_range.1) {
|
||||
return None; // segment doesn't have time range
|
||||
|
@ -783,7 +788,7 @@ impl Segment {
|
|||
fn filter_by_predicates_eq_time(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: Vec<(&str, Option<&column::Scalar>)>,
|
||||
predicates: Vec<(&str, Option<&column::Scalar<'_>>)>,
|
||||
) -> Option<croaring::Bitmap> {
|
||||
// Get all row_ids matching the time range:
|
||||
//
|
||||
|
@ -821,7 +826,7 @@ impl Segment {
|
|||
// meta row_ids bitmap.
|
||||
fn filter_by_predicates_eq_no_time(
|
||||
&self,
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
predicates: &[(&str, Option<&column::Scalar<'_>>)],
|
||||
) -> Option<croaring::Bitmap> {
|
||||
if predicates.is_empty() {
|
||||
// In this case there are no predicates provided and we have no time
|
||||
|
@ -865,10 +870,10 @@ impl Segment {
|
|||
pub fn group_single_agg_by_predicate_eq(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
predicates: &[(&str, Option<&column::Scalar<'_>>)],
|
||||
group_column: &String,
|
||||
aggregates: &Vec<(String, column::AggregateType)>,
|
||||
) -> BTreeMap<u32, Vec<((String, AggregateType), column::Aggregate)>> {
|
||||
) -> BTreeMap<u32, Vec<((String, AggregateType), column::Aggregate<'_>)>> {
|
||||
let mut grouped_results = BTreeMap::new();
|
||||
|
||||
let filter_row_ids: croaring::Bitmap;
|
||||
|
@ -884,7 +889,7 @@ impl Segment {
|
|||
let mut filtered_row_ids = row_ids.and(&filter_row_ids);
|
||||
if !filtered_row_ids.is_empty() {
|
||||
// First calculate all of the aggregates for this grouped value
|
||||
let mut aggs: Vec<((String, AggregateType), column::Aggregate)> =
|
||||
let mut aggs: Vec<((String, AggregateType), column::Aggregate<'_>)> =
|
||||
Vec::with_capacity(aggregates.len());
|
||||
|
||||
for (col_name, agg) in aggregates {
|
||||
|
@ -1004,15 +1009,15 @@ impl<'a> Segments<'a> {
|
|||
pub fn read_filter_eq(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
predicates: &[(&str, Option<&column::Scalar<'_>>)],
|
||||
select_columns: Vec<String>,
|
||||
) -> BTreeMap<String, column::Vector> {
|
||||
) -> BTreeMap<String, column::Vector<'_>> {
|
||||
let (min, max) = time_range;
|
||||
if max <= min {
|
||||
panic!("max <= min");
|
||||
}
|
||||
|
||||
let mut columns: BTreeMap<String, column::Vector> = BTreeMap::new();
|
||||
let mut columns: BTreeMap<String, column::Vector<'_>> = BTreeMap::new();
|
||||
for segment in &self.segments {
|
||||
if !segment.meta.overlaps_time_range(min, max) {
|
||||
continue; // segment doesn't have time range
|
||||
|
@ -1040,12 +1045,12 @@ impl<'a> Segments<'a> {
|
|||
pub fn read_group_eq(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
predicates: &[(&str, Option<&column::Scalar<'_>>)],
|
||||
group_columns: Vec<String>,
|
||||
aggregates: Vec<(String, AggregateType)>,
|
||||
window: i64,
|
||||
strategy: &GroupingStrategy,
|
||||
) -> BTreeMap<Vec<String>, Vec<((String, column::AggregateType), column::Aggregate)>> {
|
||||
) -> BTreeMap<Vec<String>, Vec<((String, column::AggregateType), column::Aggregate<'_>)>> {
|
||||
let (min, max) = time_range;
|
||||
if max <= min {
|
||||
panic!("max <= min");
|
||||
|
@ -1090,12 +1095,12 @@ impl<'a> Segments<'a> {
|
|||
fn read_group_eq_hash(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
predicates: &[(&str, Option<&column::Scalar<'_>>)],
|
||||
mut group_columns: Vec<String>,
|
||||
aggregates: Vec<(String, AggregateType)>,
|
||||
window: i64,
|
||||
concurrent: bool,
|
||||
) -> BTreeMap<Vec<String>, Vec<((String, column::AggregateType), column::Aggregate)>> {
|
||||
) -> BTreeMap<Vec<String>, Vec<((String, column::AggregateType), column::Aggregate<'_>)>> {
|
||||
if window > 0 {
|
||||
// add time column to the group key
|
||||
group_columns.push("time".to_string());
|
||||
|
@ -1176,12 +1181,12 @@ impl<'a> Segments<'a> {
|
|||
fn read_group_eq_sort(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
predicates: &[(&str, Option<&column::Scalar<'_>>)],
|
||||
mut group_columns: Vec<String>,
|
||||
aggregates: Vec<(String, AggregateType)>,
|
||||
window: i64,
|
||||
concurrent: bool,
|
||||
) -> BTreeMap<Vec<String>, Vec<((String, column::AggregateType), column::Aggregate)>> {
|
||||
) -> BTreeMap<Vec<String>, Vec<((String, column::AggregateType), column::Aggregate<'_>)>> {
|
||||
if window > 0 {
|
||||
// add time column to the group key
|
||||
group_columns.push("time".to_string());
|
||||
|
@ -1313,12 +1318,12 @@ impl<'a> Segments<'a> {
|
|||
}
|
||||
|
||||
/// Returns the minimum value for a column in a set of segments.
|
||||
pub fn column_min(&self, column_name: &str) -> Option<column::Scalar> {
|
||||
pub fn column_min(&self, column_name: &str) -> Option<column::Scalar<'_>> {
|
||||
if self.segments.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut min_min: Option<column::Scalar> = None;
|
||||
let mut min_min: Option<column::Scalar<'_>> = None;
|
||||
for segment in &self.segments {
|
||||
if let Some(i) = segment.column_names().iter().position(|c| c == column_name) {
|
||||
let min = segment.columns[i].min();
|
||||
|
@ -1334,12 +1339,12 @@ impl<'a> Segments<'a> {
|
|||
}
|
||||
|
||||
/// Returns the maximum value for a column in a set of segments.
|
||||
pub fn column_max(&self, column_name: &str) -> Option<column::Scalar> {
|
||||
pub fn column_max(&self, column_name: &str) -> Option<column::Scalar<'_>> {
|
||||
if self.segments.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut max_max: Option<column::Scalar> = None;
|
||||
let mut max_max: Option<column::Scalar<'_>> = None;
|
||||
for segment in &self.segments {
|
||||
if let Some(i) = segment.column_names().iter().position(|c| c == column_name) {
|
||||
let max = segment.columns[i].max();
|
||||
|
@ -1362,7 +1367,7 @@ impl<'a> Segments<'a> {
|
|||
/// If the time column has multiple max time values then the result is abitrary.
|
||||
///
|
||||
/// TODO(edd): could return NULL value..
|
||||
pub fn first(&self, column_name: &str) -> Option<(i64, Option<column::Scalar>, usize)> {
|
||||
pub fn first(&self, column_name: &str) -> Option<(i64, Option<column::Scalar<'_>>, usize)> {
|
||||
// First let's find the segment with the earliest time range.
|
||||
// notice we order a < b on max time range.
|
||||
let segment = self
|
||||
|
@ -1394,7 +1399,7 @@ impl<'a> Segments<'a> {
|
|||
/// If the time column has multiple max time values then the result is abitrary.
|
||||
///
|
||||
/// TODO(edd): could return NULL value..
|
||||
pub fn last(&self, column_name: &str) -> Option<(i64, Option<column::Scalar>, usize)> {
|
||||
pub fn last(&self, column_name: &str) -> Option<(i64, Option<column::Scalar<'_>>, usize)> {
|
||||
// First let's find the segment with the latest time range.
|
||||
// notice we order a > b on max time range.
|
||||
let segment = self
|
||||
|
|
Loading…
Reference in New Issue